1use crate::attachments::validate_request_attachments;
12use crate::impls::anthropic::{MODEL_OPUS_46, MODEL_SONNET_46, data as anthropic_data};
13use crate::impls::gemini::data::{
14 ApiContent, ApiFunctionCallingConfig, ApiGenerateContentRequest, ApiGenerateContentResponse,
15 ApiGenerationConfig, ApiPart, ApiUsageMetadata, build_api_contents, build_content_blocks,
16 convert_tools_to_config, gemini_response_schema, map_finish_reason, map_thinking_config,
17 stream_gemini_response,
18};
19use crate::provider::LlmProvider;
20use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
21use agent_sdk_foundation::llm::{
22 ChatOutcome, ChatRequest, ChatResponse, ThinkingConfig, ThinkingMode, Usage,
23};
24use anyhow::Result;
25use async_trait::async_trait;
26use futures::StreamExt;
27use reqwest::StatusCode;
28
29pub const MODEL_GEMINI_3_FLASH: &str = "gemini-3-flash-preview";
30pub const MODEL_GEMINI_31_PRO: &str = "gemini-3.1-pro-preview";
31
32pub const MODEL_GEMINI_3_PRO: &str = "gemini-3.0-pro";
34
35const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
37const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
38
39const fn vertex_cache_control() -> anthropic_data::ApiCacheControl {
40 anthropic_data::ApiCacheControl::ephemeral()
41}
42
43#[derive(Clone)]
52pub struct VertexProvider {
53 client: reqwest::Client,
54 access_token: String,
55 project_id: String,
56 region: String,
57 model: String,
58 thinking: Option<ThinkingConfig>,
59}
60
61impl VertexProvider {
62 #[must_use]
64 pub fn new(access_token: String, project_id: String, region: String, model: String) -> Self {
65 let client = reqwest::Client::builder()
66 .connect_timeout(std::time::Duration::from_secs(30))
67 .tcp_keepalive(std::time::Duration::from_secs(30))
68 .build()
69 .unwrap_or_default();
70
71 Self {
72 client,
73 access_token,
74 project_id,
75 region,
76 model,
77 thinking: None,
78 }
79 }
80
81 #[must_use]
83 pub fn flash(access_token: String, project_id: String, region: String) -> Self {
84 Self::new(
85 access_token,
86 project_id,
87 region,
88 MODEL_GEMINI_3_FLASH.to_owned(),
89 )
90 }
91
92 #[must_use]
94 pub fn pro(access_token: String, project_id: String, region: String) -> Self {
95 Self::new(
96 access_token,
97 project_id,
98 region,
99 MODEL_GEMINI_31_PRO.to_owned(),
100 )
101 }
102
103 fn is_claude_model(&self) -> bool {
105 self.model.starts_with("claude-")
106 }
107
108 fn base_url(&self, publisher: &str) -> String {
113 let domain = if self.region == "global" {
114 "aiplatform.googleapis.com".to_owned()
115 } else {
116 format!("{}-aiplatform.googleapis.com", self.region)
117 };
118 format!(
119 "https://{domain}/v1/projects/{project}/locations/{region}/publishers/{publisher}/models/{model}",
120 domain = domain,
121 region = self.region,
122 project = self.project_id,
123 publisher = publisher,
124 model = self.model,
125 )
126 }
127
128 #[must_use]
130 pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
131 self.thinking = Some(thinking);
132 self
133 }
134
135 fn requires_anthropic_adaptive_thinking(&self) -> bool {
136 matches!(self.model.as_str(), MODEL_SONNET_46 | MODEL_OPUS_46)
137 }
138
139 fn build_cached_vertex_claude_messages(
140 request: &ChatRequest,
141 ) -> Vec<anthropic_data::ApiMessage> {
142 let mut messages = anthropic_data::build_api_messages(request);
143 anthropic_data::apply_cache_control_to_last_user_message(
144 &mut messages,
145 vertex_cache_control(),
146 );
147 messages
148 }
149
150 fn build_vertex_claude_system_prompt(
151 system: &str,
152 ) -> Option<anthropic_data::ApiSystemPrompt<'_>> {
153 anthropic_data::build_api_system_prompt(system, Some(vertex_cache_control()))
154 }
155
156 fn map_claude_response(api_response: anthropic_data::ApiResponse) -> ChatResponse {
157 let content = anthropic_data::map_content_blocks(api_response.content);
158 let stop_reason = api_response
159 .stop_reason
160 .as_ref()
161 .map(anthropic_data::map_stop_reason);
162
163 ChatResponse {
164 id: api_response.id,
165 content,
166 model: api_response.model,
167 stop_reason,
168 usage: Usage {
169 input_tokens: api_response.usage.total_input_tokens(),
170 output_tokens: api_response.usage.output,
171 cached_input_tokens: api_response.usage.cached_input_tokens(),
172 cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
173 },
174 }
175 }
176}
177
178#[async_trait]
179impl LlmProvider for VertexProvider {
180 async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
181 if self.is_claude_model() {
182 return self.chat_claude(request).await;
183 }
184 self.chat_gemini(request).await
185 }
186
187 fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
188 if self.is_claude_model() {
189 return self.chat_stream_claude(request);
190 }
191 self.chat_stream_gemini(request)
192 }
193
194 fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
195 let Some(thinking) = thinking else {
196 return Ok(());
197 };
198
199 if self
200 .capabilities()
201 .is_some_and(|caps| !caps.supports_thinking)
202 {
203 return Err(anyhow::anyhow!(
204 "thinking is not supported for provider={} model={}",
205 self.provider(),
206 self.model()
207 ));
208 }
209
210 if matches!(thinking.mode, ThinkingMode::Adaptive)
211 && !self
212 .capabilities()
213 .is_some_and(|caps| caps.supports_adaptive_thinking)
214 {
215 return Err(anyhow::anyhow!(
216 "adaptive thinking is not supported for provider={} model={}",
217 self.provider(),
218 self.model()
219 ));
220 }
221
222 if self.is_claude_model()
223 && self.requires_anthropic_adaptive_thinking()
224 && matches!(thinking.mode, ThinkingMode::Enabled { .. })
225 {
226 return Err(anyhow::anyhow!(
227 "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
228 self.provider(),
229 self.model()
230 ));
231 }
232
233 Ok(())
234 }
235
236 fn model(&self) -> &str {
237 &self.model
238 }
239
240 fn provider(&self) -> &'static str {
241 "vertex"
242 }
243
244 fn configured_thinking(&self) -> Option<&ThinkingConfig> {
245 self.thinking.as_ref()
246 }
247
248 fn default_max_tokens(&self) -> u32 {
249 let provider = if self.is_claude_model() {
250 "anthropic"
251 } else {
252 "gemini"
253 };
254 let model_max = self
255 .capabilities()
256 .and_then(|caps| caps.max_output_tokens)
257 .or_else(|| {
258 crate::model_capabilities::default_max_output_tokens(provider, self.model())
259 })
260 .unwrap_or(4096);
261 model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
262 }
263}
264
265impl VertexProvider {
270 #[allow(clippy::too_many_lines)]
271 async fn chat_gemini(&self, request: ChatRequest) -> Result<ChatOutcome> {
272 let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
273 Ok(thinking) => thinking,
274 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
275 };
276 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
277 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
278 }
279 let contents = build_api_contents(&request.messages);
280 let tools = request.tools.map(convert_tools_to_config);
281 let tool_config = request
282 .tool_choice
283 .as_ref()
284 .map(ApiFunctionCallingConfig::from_tool_choice);
285 let system_instruction = if request.system.is_empty() {
286 None
287 } else {
288 Some(ApiContent {
289 role: None,
290 parts: vec![ApiPart::Text {
291 text: request.system.clone(),
292 thought_signature: None,
293 }],
294 })
295 };
296
297 let thinking_config = thinking.as_ref().map(map_thinking_config);
298 let (response_mime_type, response_schema) =
299 request.response_format.as_ref().map_or((None, None), |rf| {
300 (
301 Some("application/json"),
302 Some(gemini_response_schema(&rf.schema)),
303 )
304 });
305
306 let api_request = ApiGenerateContentRequest {
307 contents: &contents,
308 system_instruction: system_instruction.as_ref(),
309 tools: tools.as_ref().map(std::slice::from_ref),
310 tool_config,
311 generation_config: Some(ApiGenerationConfig {
312 max_output_tokens: Some(request.max_tokens),
313 thinking_config,
314 response_mime_type,
315 response_schema,
316 }),
317 cached_content: request.cached_content.as_deref(),
318 };
319
320 log::debug!(
321 "Vertex AI LLM request model={} max_tokens={}",
322 self.model,
323 request.max_tokens
324 );
325
326 let url = format!("{}:generateContent", self.base_url("google"));
327
328 let response = self
329 .client
330 .post(&url)
331 .header("Content-Type", "application/json")
332 .bearer_auth(&self.access_token)
333 .json(&api_request)
334 .send()
335 .await
336 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
337
338 let status = response.status();
339 let bytes = response
340 .bytes()
341 .await
342 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
343
344 log::debug!(
345 "Vertex AI LLM response status={} body_len={}",
346 status,
347 bytes.len()
348 );
349
350 if status == StatusCode::TOO_MANY_REQUESTS {
351 return Ok(ChatOutcome::RateLimited);
352 }
353
354 if status.is_server_error() {
355 let body = String::from_utf8_lossy(&bytes);
356 log::error!("Vertex AI server error status={status} body={body}");
357 return Ok(ChatOutcome::ServerError(body.into_owned()));
358 }
359
360 if status.is_client_error() {
361 let body = String::from_utf8_lossy(&bytes);
362 log::warn!("Vertex AI client error status={status} body={body}");
363 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
364 }
365
366 let api_response: ApiGenerateContentResponse = serde_json::from_slice(&bytes)
367 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
368
369 let candidate = api_response
370 .candidates
371 .into_iter()
372 .next()
373 .ok_or_else(|| anyhow::anyhow!("no candidates in response"))?;
374
375 let content = build_content_blocks(&candidate.content);
376
377 if content.is_empty() && !candidate.content.parts.is_empty() {
378 log::warn!(
379 "Vertex AI parts not converted to content blocks raw_parts={:?}",
380 candidate.content.parts
381 );
382 }
383
384 let has_tool_calls = content
385 .iter()
386 .any(|b| matches!(b, agent_sdk_foundation::llm::ContentBlock::ToolUse { .. }));
387
388 let stop_reason = candidate
389 .finish_reason
390 .as_ref()
391 .map(|r| map_finish_reason(r, has_tool_calls));
392
393 let usage = api_response
394 .usage_metadata
395 .unwrap_or(ApiUsageMetadata {
396 prompt: 0,
397 candidates: 0,
398 cached_content: 0,
399 })
400 .into_usage();
401
402 Ok(ChatOutcome::Success(ChatResponse {
403 id: String::new(),
404 content,
405 model: self.model.clone(),
406 stop_reason,
407 usage,
408 }))
409 }
410
411 fn chat_stream_gemini(&self, request: ChatRequest) -> StreamBox<'_> {
412 Box::pin(async_stream::stream! {
413 let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
414 Ok(thinking) => thinking,
415 Err(error) => {
416 yield Ok(StreamDelta::Error {
417 message: error.to_string(),
418 kind: StreamErrorKind::InvalidRequest,
419 });
420 return;
421 }
422 };
423 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
424 yield Ok(StreamDelta::Error {
425 message: error.to_string(),
426 kind: StreamErrorKind::InvalidRequest,
427 });
428 return;
429 }
430 let contents = build_api_contents(&request.messages);
431 let tools = request.tools.map(convert_tools_to_config);
432 let tool_config = request
433 .tool_choice
434 .as_ref()
435 .map(ApiFunctionCallingConfig::from_tool_choice);
436 let system_instruction = if request.system.is_empty() {
437 None
438 } else {
439 Some(ApiContent {
440 role: None,
441 parts: vec![ApiPart::Text {
442 text: request.system.clone(),
443 thought_signature: None,
444 }],
445 })
446 };
447
448 let thinking_config = thinking.as_ref().map(map_thinking_config);
449 let (response_mime_type, response_schema) = request
450 .response_format
451 .as_ref()
452 .map_or((None, None), |rf| {
453 (
454 Some("application/json"),
455 Some(gemini_response_schema(&rf.schema)),
456 )
457 });
458
459 let api_request = ApiGenerateContentRequest {
460 contents: &contents,
461 system_instruction: system_instruction.as_ref(),
462 tools: tools.as_ref().map(std::slice::from_ref),
463 tool_config,
464 generation_config: Some(ApiGenerationConfig {
465 max_output_tokens: Some(request.max_tokens),
466 thinking_config,
467 response_mime_type,
468 response_schema,
469 }),
470 cached_content: request.cached_content.as_deref(),
471 };
472
473 log::debug!(
474 "Vertex AI streaming LLM request model={} max_tokens={}",
475 self.model,
476 request.max_tokens
477 );
478
479 let url = format!("{}:streamGenerateContent?alt=sse", self.base_url("google"));
480
481 let Ok(response) = self
482 .client
483 .post(&url)
484 .header("Content-Type", "application/json")
485 .bearer_auth(&self.access_token)
486 .json(&api_request)
487 .send()
488 .await
489 else {
490 yield Err(anyhow::anyhow!("request failed"));
491 return;
492 };
493
494 let status = response.status();
495 if !status.is_success() {
496 let body = response.text().await.unwrap_or_default();
497 let kind = if status == StatusCode::TOO_MANY_REQUESTS {
498 StreamErrorKind::RateLimited
499 } else if status.is_server_error() {
500 StreamErrorKind::ServerError
501 } else {
502 StreamErrorKind::InvalidRequest
503 };
504 log::warn!("Vertex AI error status={status} body={body}");
505 yield Ok(StreamDelta::Error {
506 message: body,
507 kind,
508 });
509 return;
510 }
511
512 let mut inner = stream_gemini_response(response);
513 while let Some(item) = futures::StreamExt::next(&mut inner).await {
514 yield item;
515 }
516 })
517 }
518}
519
520impl VertexProvider {
525 async fn chat_claude(&self, request: ChatRequest) -> Result<ChatOutcome> {
526 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
527 Ok(thinking) => thinking,
528 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
529 };
530 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
531 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
532 }
533 let messages = Self::build_cached_vertex_claude_messages(&request);
534 let tools = anthropic_data::build_api_tools(&request);
535 let thinking = thinking_config
536 .as_ref()
537 .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
538 let output_config = thinking_config
539 .as_ref()
540 .and_then(|t| t.effort)
541 .map(|effort| anthropic_data::ApiOutputConfig { effort });
542 let system = Self::build_vertex_claude_system_prompt(&request.system);
543 let tool_choice = request
544 .tool_choice
545 .as_ref()
546 .map(anthropic_data::ApiToolChoice::from_tool_choice);
547
548 let api_request = anthropic_data::ApiMessagesRequest {
549 model: None, max_tokens: request.max_tokens,
551 system,
552 messages: &messages,
553 tools: tools.as_deref(),
554 tool_choice,
555 stream: false,
556 thinking,
557 output_config,
558 anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
559 };
560
561 log::debug!(
562 "Vertex AI (Claude) LLM request model={} max_tokens={}",
563 self.model,
564 request.max_tokens
565 );
566
567 if log::log_enabled!(log::Level::Debug) {
568 match serde_json::to_string_pretty(&api_request) {
569 Ok(json) => log::debug!("Vertex AI (Claude) request payload:\n{json}"),
570 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
571 }
572 }
573
574 let url = format!("{}:rawPredict", self.base_url("anthropic"));
575
576 let response = self
577 .client
578 .post(&url)
579 .header("Content-Type", "application/json")
580 .bearer_auth(&self.access_token)
581 .json(&api_request)
582 .send()
583 .await
584 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
585
586 let status = response.status();
587 let bytes = response
588 .bytes()
589 .await
590 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
591
592 log::debug!(
593 "Vertex AI (Claude) response status={} body_len={}",
594 status,
595 bytes.len()
596 );
597
598 if status == StatusCode::TOO_MANY_REQUESTS {
599 return Ok(ChatOutcome::RateLimited);
600 }
601
602 if status.is_server_error() {
603 let body = String::from_utf8_lossy(&bytes);
604 log::error!("Vertex AI (Claude) server error status={status} body={body}");
605 return Ok(ChatOutcome::ServerError(body.into_owned()));
606 }
607
608 if status.is_client_error() {
609 let body = String::from_utf8_lossy(&bytes);
610 log::warn!("Vertex AI (Claude) client error status={status} body={body}");
611 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
612 }
613
614 let api_response: anthropic_data::ApiResponse = serde_json::from_slice(&bytes)
615 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
616
617 log::debug!(
618 "Vertex AI (Claude) response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
619 api_response.id,
620 api_response.model,
621 api_response.stop_reason,
622 api_response.usage.total_input_tokens(),
623 api_response.usage.output,
624 api_response.content.len()
625 );
626
627 Ok(ChatOutcome::Success(Self::map_claude_response(
628 api_response,
629 )))
630 }
631
632 #[allow(clippy::too_many_lines)]
633 fn chat_stream_claude(&self, request: ChatRequest) -> StreamBox<'_> {
634 Box::pin(async_stream::stream! {
635 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
636 Ok(thinking) => thinking,
637 Err(error) => {
638 yield Ok(StreamDelta::Error {
639 message: error.to_string(),
640 kind: StreamErrorKind::InvalidRequest,
641 });
642 return;
643 }
644 };
645 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
646 yield Ok(StreamDelta::Error {
647 message: error.to_string(),
648 kind: StreamErrorKind::InvalidRequest,
649 });
650 return;
651 }
652 let messages = Self::build_cached_vertex_claude_messages(&request);
653 let tools = anthropic_data::build_api_tools(&request);
654 let thinking = thinking_config
655 .as_ref()
656 .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
657 let output_config = thinking_config
658 .as_ref()
659 .and_then(|t| t.effort)
660 .map(|effort| anthropic_data::ApiOutputConfig { effort });
661 let system = Self::build_vertex_claude_system_prompt(&request.system);
662 let tool_choice = request
663 .tool_choice
664 .as_ref()
665 .map(anthropic_data::ApiToolChoice::from_tool_choice);
666
667 let api_request = anthropic_data::ApiMessagesRequest {
668 model: None, max_tokens: request.max_tokens,
670 system,
671 messages: &messages,
672 tools: tools.as_deref(),
673 tool_choice,
674 stream: true,
675 thinking,
676 output_config,
677 anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
678 };
679
680 log::debug!(
681 "Vertex AI (Claude) streaming request model={} max_tokens={}",
682 self.model,
683 request.max_tokens
684 );
685
686 if log::log_enabled!(log::Level::Debug) {
687 match serde_json::to_string_pretty(&api_request) {
688 Ok(json) => log::debug!("Vertex AI (Claude) streaming request payload:\n{json}"),
689 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
690 }
691 }
692
693 let url = format!("{}:streamRawPredict", self.base_url("anthropic"));
694
695 let response = match self
696 .client
697 .post(&url)
698 .header("Content-Type", "application/json")
699 .bearer_auth(&self.access_token)
700 .json(&api_request)
701 .send()
702 .await
703 {
704 Ok(r) => r,
705 Err(e) => {
706 yield Err(anyhow::anyhow!("request failed: {e}"));
707 return;
708 }
709 };
710
711 let status = response.status();
712
713 if status == StatusCode::TOO_MANY_REQUESTS {
714 yield Ok(StreamDelta::Error {
715 message: "Rate limited".to_string(),
716 kind: StreamErrorKind::RateLimited,
717 });
718 return;
719 }
720
721 if status.is_server_error() {
722 let body = response.text().await.unwrap_or_default();
723 log::error!("Vertex AI (Claude) server error status={status} body={body}");
724 yield Ok(StreamDelta::Error {
725 message: body,
726 kind: StreamErrorKind::ServerError,
727 });
728 return;
729 }
730
731 if status.is_client_error() {
732 let body = response.text().await.unwrap_or_default();
733 log::warn!("Vertex AI (Claude) client error status={status} body={body}");
734 yield Ok(StreamDelta::Error {
735 message: body,
736 kind: StreamErrorKind::InvalidRequest,
737 });
738 return;
739 }
740
741 let mut stream = response.bytes_stream();
743 let mut buffer = String::new();
744 let mut input_tokens: u32 = 0;
745 let mut output_tokens: u32 = 0;
746 let mut cached_input_tokens: u32 = 0;
747 let mut cache_creation_input_tokens: u32 = 0;
748 let mut tool_ids: std::collections::HashMap<usize, String> =
749 std::collections::HashMap::new();
750 let mut received_message_stop = false;
751 let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
752
753 while let Some(chunk_result) = stream.next().await {
754 let Ok(chunk) = chunk_result else {
755 yield Err(anyhow::anyhow!("stream error"));
756 return;
757 };
758
759 buffer.push_str(&String::from_utf8_lossy(&chunk));
760
761 while let Some(event_block) = anthropic_data::take_next_sse_event(&mut buffer) {
763 if anthropic_data::is_message_stop_event(&event_block) {
764 received_message_stop = true;
765 }
766
767 if let Some(delta) = anthropic_data::parse_sse_event(
768 &event_block,
769 &mut input_tokens,
770 &mut output_tokens,
771 &mut cached_input_tokens,
772 &mut cache_creation_input_tokens,
773 &mut tool_ids,
774 &mut pending_stop_reason,
775 ) {
776 yield Ok(delta);
777 }
778 if anthropic_data::is_message_stop_event(&event_block) {
779 yield Ok(StreamDelta::Done {
780 stop_reason: pending_stop_reason.take(),
781 });
782 }
783 }
784 }
785
786 let remaining = buffer.trim();
788 if !remaining.is_empty() {
789 if anthropic_data::is_message_stop_event(remaining) {
790 received_message_stop = true;
791 }
792
793 if let Some(delta) = anthropic_data::parse_sse_event(
794 remaining,
795 &mut input_tokens,
796 &mut output_tokens,
797 &mut cached_input_tokens,
798 &mut cache_creation_input_tokens,
799 &mut tool_ids,
800 &mut pending_stop_reason,
801 ) {
802 yield Ok(delta);
803 }
804 if anthropic_data::is_message_stop_event(remaining) {
805 yield Ok(StreamDelta::Done {
806 stop_reason: pending_stop_reason.take(),
807 });
808 }
809 }
810
811 if !received_message_stop {
812 log::warn!(
813 "Vertex AI (Claude) SSE stream ended without message_stop"
814 );
815 yield Ok(StreamDelta::Error {
816 message: "Stream ended unexpectedly without completion".to_string(),
817 kind: StreamErrorKind::ServerError,
818 });
819 }
820 })
821 }
822}
823
824#[cfg(test)]
825mod tests {
826 use super::*;
827
828 #[test]
829 fn test_new_creates_provider() {
830 let provider = VertexProvider::new(
831 "token".to_string(),
832 "my-project".to_string(),
833 "us-central1".to_string(),
834 "custom-model".to_string(),
835 );
836
837 assert_eq!(provider.model(), "custom-model");
838 assert_eq!(provider.provider(), "vertex");
839 }
840
841 #[test]
842 fn test_flash_factory() {
843 let provider = VertexProvider::flash(
844 "token".to_string(),
845 "my-project".to_string(),
846 "us-central1".to_string(),
847 );
848
849 assert_eq!(provider.model(), MODEL_GEMINI_3_FLASH);
850 assert_eq!(provider.provider(), "vertex");
851 }
852
853 #[test]
854 fn test_pro_factory() {
855 let provider = VertexProvider::pro(
856 "token".to_string(),
857 "my-project".to_string(),
858 "us-central1".to_string(),
859 );
860
861 assert_eq!(provider.model(), MODEL_GEMINI_31_PRO);
862 assert_eq!(provider.provider(), "vertex");
863 }
864
865 #[test]
866 fn test_provider_is_cloneable() {
867 let provider = VertexProvider::new(
868 "token".to_string(),
869 "my-project".to_string(),
870 "us-central1".to_string(),
871 "test-model".to_string(),
872 );
873 let cloned = provider.clone();
874
875 assert_eq!(provider.model(), cloned.model());
876 assert_eq!(provider.provider(), cloned.provider());
877 }
878
879 #[test]
880 fn test_is_claude_model() {
881 let claude_provider = VertexProvider::new(
882 "token".to_string(),
883 "project".to_string(),
884 "us-central1".to_string(),
885 "claude-sonnet-4-20250514".to_string(),
886 );
887 assert!(claude_provider.is_claude_model());
888
889 let gemini_provider = VertexProvider::new(
890 "token".to_string(),
891 "project".to_string(),
892 "us-central1".to_string(),
893 "gemini-3-flash-preview".to_string(),
894 );
895 assert!(!gemini_provider.is_claude_model());
896 }
897
898 #[test]
899 fn test_base_url_gemini() {
900 let provider = VertexProvider::new(
901 "token".to_string(),
902 "my-project".to_string(),
903 "us-central1".to_string(),
904 "gemini-3-flash-preview".to_string(),
905 );
906
907 let url = provider.base_url("google");
908 assert_eq!(
909 url,
910 "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google/models/gemini-3-flash-preview"
911 );
912 }
913
914 #[test]
915 fn test_base_url_claude() {
916 let provider = VertexProvider::new(
917 "token".to_string(),
918 "my-project".to_string(),
919 "us-central1".to_string(),
920 "claude-sonnet-4-20250514".to_string(),
921 );
922
923 let url = provider.base_url("anthropic");
924 assert_eq!(
925 url,
926 "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/anthropic/models/claude-sonnet-4-20250514"
927 );
928 }
929
930 #[test]
931 fn test_base_url_with_different_region() {
932 let provider = VertexProvider::new(
933 "token".to_string(),
934 "other-project".to_string(),
935 "europe-west4".to_string(),
936 "gemini-3.1-pro-preview".to_string(),
937 );
938
939 let url = provider.base_url("google");
940 assert!(url.starts_with("https://europe-west4-aiplatform.googleapis.com/"));
941 assert!(url.contains("/projects/other-project/"));
942 assert!(url.contains("/locations/europe-west4/"));
943 assert!(url.ends_with("/models/gemini-3.1-pro-preview"));
944 }
945
946 #[test]
947 fn test_base_url_global_region_has_no_prefix() {
948 let provider = VertexProvider::new(
949 "token".to_string(),
950 "my-project".to_string(),
951 "global".to_string(),
952 "gemini-3.1-pro-preview".to_string(),
953 );
954
955 let url = provider.base_url("google");
956 assert_eq!(
957 url,
958 "https://aiplatform.googleapis.com/v1/projects/my-project/locations/global/publishers/google/models/gemini-3.1-pro-preview"
959 );
960 }
961
962 #[test]
963 fn test_vertex_claude_46_rejects_budgeted_thinking() {
964 let provider = VertexProvider::new(
965 "token".to_string(),
966 "project".to_string(),
967 "global".to_string(),
968 MODEL_SONNET_46.to_string(),
969 );
970
971 let error = provider
972 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
973 .unwrap_err();
974 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
975 }
976
977 #[test]
978 fn test_model_constants() {
979 assert_eq!(MODEL_GEMINI_3_FLASH, "gemini-3-flash-preview");
980 assert_eq!(MODEL_GEMINI_31_PRO, "gemini-3.1-pro-preview");
981 assert_eq!(MODEL_GEMINI_3_PRO, "gemini-3.0-pro");
982 }
983}