1use crate::attachments::validate_request_attachments;
12use crate::impls::anthropic::{
13 MODEL_FABLE_5, MODEL_OPUS_46, MODEL_OPUS_47, MODEL_OPUS_48, MODEL_SONNET_46,
14 data as anthropic_data,
15};
16use crate::impls::gemini::data::{
17 ApiContent, ApiFunctionCallingConfig, ApiGenerateContentRequest, ApiGenerateContentResponse,
18 ApiGenerationConfig, ApiPart, ApiUsageMetadata, build_api_contents, build_content_blocks,
19 convert_tools_to_config, gemini_response_schema, map_finish_reason, map_thinking_config,
20 stream_gemini_response,
21};
22use crate::provider::LlmProvider;
23use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
24use agent_sdk_foundation::llm::{
25 ChatOutcome, ChatRequest, ChatResponse, ResponseFormat, ThinkingConfig, ThinkingMode, Usage,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::StreamExt;
30use reqwest::StatusCode;
31
32pub const MODEL_GEMINI_3_FLASH: &str = "gemini-3-flash-preview";
33pub const MODEL_GEMINI_31_PRO: &str = "gemini-3.1-pro-preview";
34
35pub const MODEL_GEMINI_3_PRO: &str = "gemini-3.0-pro";
37
38const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
40const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
41
42const CONNECT_TIMEOUT_SECS: u64 = 30;
44const TCP_KEEPALIVE_SECS: u64 = 30;
46const CHAT_READ_TIMEOUT_SECS: u64 = 300;
50
51const fn vertex_cache_control() -> anthropic_data::ApiCacheControl {
52 anthropic_data::ApiCacheControl::ephemeral_with_ttl(None)
53}
54
55fn build_vertex_claude_tools(request: &ChatRequest) -> Option<Vec<anthropic_data::ApiTool>> {
58 anthropic_data::build_api_tools_with_cache(request, Some(vertex_cache_control()))
59}
60
61#[derive(Clone)]
70pub struct VertexProvider {
71 client: reqwest::Client,
72 access_token: String,
73 project_id: String,
74 region: String,
75 model: String,
76 thinking: Option<ThinkingConfig>,
77}
78
79impl VertexProvider {
80 #[must_use]
82 pub fn new(access_token: String, project_id: String, region: String, model: String) -> Self {
83 let client = reqwest::Client::builder()
84 .connect_timeout(std::time::Duration::from_secs(CONNECT_TIMEOUT_SECS))
85 .tcp_keepalive(std::time::Duration::from_secs(TCP_KEEPALIVE_SECS))
86 .build()
87 .unwrap_or_else(|error| {
88 log::warn!(
89 "failed to build Vertex HTTP client with timeouts ({error}); using default client"
90 );
91 reqwest::Client::new()
92 });
93
94 Self {
95 client,
96 access_token,
97 project_id,
98 region,
99 model,
100 thinking: None,
101 }
102 }
103
104 #[must_use]
106 pub fn flash(access_token: String, project_id: String, region: String) -> Self {
107 Self::new(
108 access_token,
109 project_id,
110 region,
111 MODEL_GEMINI_3_FLASH.to_owned(),
112 )
113 }
114
115 #[must_use]
117 pub fn pro(access_token: String, project_id: String, region: String) -> Self {
118 Self::new(
119 access_token,
120 project_id,
121 region,
122 MODEL_GEMINI_31_PRO.to_owned(),
123 )
124 }
125
126 fn is_claude_model(&self) -> bool {
128 self.model.starts_with("claude-")
129 }
130
131 fn base_url(&self, publisher: &str) -> String {
136 let domain = if self.region == "global" {
137 "aiplatform.googleapis.com".to_owned()
138 } else {
139 format!("{}-aiplatform.googleapis.com", self.region)
140 };
141 format!(
142 "https://{domain}/v1/projects/{project}/locations/{region}/publishers/{publisher}/models/{model}",
143 domain = domain,
144 region = self.region,
145 project = self.project_id,
146 publisher = publisher,
147 model = self.model,
148 )
149 }
150
151 #[must_use]
153 pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
154 self.thinking = Some(thinking);
155 self
156 }
157
158 fn requires_anthropic_adaptive_thinking(&self) -> bool {
159 matches!(
160 self.model.as_str(),
161 MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48 | MODEL_FABLE_5
162 )
163 }
164
165 fn build_cached_vertex_claude_messages(
166 request: &ChatRequest,
167 ) -> Vec<anthropic_data::ApiMessage> {
168 let mut messages = anthropic_data::build_api_messages(request);
169 anthropic_data::apply_cache_control_to_last_user_message(
170 &mut messages,
171 vertex_cache_control(),
172 );
173 messages
174 }
175
176 fn build_vertex_claude_system_prompt(
177 system: &str,
178 ) -> Option<anthropic_data::ApiSystemPrompt<'_>> {
179 anthropic_data::build_api_system_prompt(system, Some(vertex_cache_control()))
180 }
181
182 fn effective_max_tokens(&self, request: &ChatRequest) -> u32 {
189 if request.max_tokens_explicit {
190 request.max_tokens
191 } else {
192 self.default_max_tokens()
193 }
194 }
195
196 fn map_claude_response(api_response: anthropic_data::ApiResponse) -> ChatResponse {
197 let content = anthropic_data::map_content_blocks(api_response.content);
198 let stop_reason = api_response
199 .stop_reason
200 .as_ref()
201 .map(anthropic_data::map_stop_reason);
202
203 ChatResponse {
204 id: api_response.id,
205 content,
206 model: api_response.model,
207 stop_reason,
208 usage: Usage {
209 input_tokens: api_response.usage.total_input_tokens(),
210 output_tokens: api_response.usage.output,
211 cached_input_tokens: api_response.usage.cached_input_tokens(),
212 cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
213 },
214 }
215 }
216}
217
218#[async_trait]
219impl LlmProvider for VertexProvider {
220 async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
221 if self.is_claude_model() {
222 return self.chat_claude(request).await;
223 }
224 self.chat_gemini(request).await
225 }
226
227 fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
228 if self.is_claude_model() {
229 return self.chat_stream_claude(request);
230 }
231 self.chat_stream_gemini(request)
232 }
233
234 fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
235 let Some(thinking) = thinking else {
236 return Ok(());
237 };
238
239 if self
240 .capabilities()
241 .is_some_and(|caps| !caps.supports_thinking)
242 {
243 return Err(anyhow::anyhow!(
244 "thinking is not supported for provider={} model={}",
245 self.provider(),
246 self.model()
247 ));
248 }
249
250 if matches!(thinking.mode, ThinkingMode::Adaptive)
251 && !self
252 .capabilities()
253 .is_some_and(|caps| caps.supports_adaptive_thinking)
254 {
255 return Err(anyhow::anyhow!(
256 "adaptive thinking is not supported for provider={} model={}",
257 self.provider(),
258 self.model()
259 ));
260 }
261
262 if self.is_claude_model()
263 && self.requires_anthropic_adaptive_thinking()
264 && matches!(thinking.mode, ThinkingMode::Enabled { .. })
265 {
266 return Err(anyhow::anyhow!(
267 "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
268 self.provider(),
269 self.model()
270 ));
271 }
272
273 Ok(())
274 }
275
276 fn model(&self) -> &str {
277 &self.model
278 }
279
280 fn provider(&self) -> &'static str {
281 "vertex"
282 }
283
284 fn configured_thinking(&self) -> Option<&ThinkingConfig> {
285 self.thinking.as_ref()
286 }
287
288 fn default_max_tokens(&self) -> u32 {
289 let provider = if self.is_claude_model() {
290 "anthropic"
291 } else {
292 "gemini"
293 };
294 let model_max = self
295 .capabilities()
296 .and_then(|caps| caps.max_output_tokens)
297 .or_else(|| {
298 crate::model_capabilities::default_max_output_tokens(provider, self.model())
299 })
300 .unwrap_or(4096);
301 model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
302 }
303}
304
305impl VertexProvider {
310 #[allow(clippy::too_many_lines)]
311 async fn chat_gemini(&self, request: ChatRequest) -> Result<ChatOutcome> {
312 let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
313 Ok(thinking) => thinking,
314 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
315 };
316 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
317 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
318 }
319 let contents = build_api_contents(&request.messages);
320 let tools = request
321 .tools
322 .as_ref()
323 .map(|t| convert_tools_to_config(t.clone()));
324 let tool_config = request
325 .tool_choice
326 .as_ref()
327 .map(ApiFunctionCallingConfig::from_tool_choice);
328 let system_instruction = if request.system.is_empty() {
329 None
330 } else {
331 Some(ApiContent {
332 role: None,
333 parts: vec![ApiPart::Text {
334 text: request.system.clone(),
335 thought_signature: None,
336 }],
337 })
338 };
339
340 let thinking_config = thinking.as_ref().map(map_thinking_config);
341 let (response_mime_type, response_schema) =
342 request.response_format.as_ref().map_or((None, None), |rf| {
343 (
344 Some("application/json"),
345 Some(gemini_response_schema(&rf.schema)),
346 )
347 });
348
349 let max_tokens = self.effective_max_tokens(&request);
350 let api_request = ApiGenerateContentRequest {
351 contents: &contents,
352 system_instruction: system_instruction.as_ref(),
353 tools: tools.as_ref().map(std::slice::from_ref),
354 tool_config,
355 generation_config: Some(ApiGenerationConfig {
356 max_output_tokens: Some(max_tokens),
357 thinking_config,
358 response_mime_type,
359 response_schema,
360 }),
361 cached_content: request.cached_content.as_deref(),
362 };
363
364 log::debug!(
365 "Vertex AI LLM request model={} max_tokens={}",
366 self.model,
367 max_tokens
368 );
369
370 let url = format!("{}:generateContent", self.base_url("google"));
371
372 let response = self
373 .client
374 .post(&url)
375 .header("Content-Type", "application/json")
376 .timeout(std::time::Duration::from_secs(CHAT_READ_TIMEOUT_SECS))
377 .bearer_auth(&self.access_token)
378 .json(&api_request)
379 .send()
380 .await
381 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
382
383 let status = response.status();
384 let retry_after = if status == StatusCode::TOO_MANY_REQUESTS {
386 crate::http::retry_after_from_headers(response.headers())
387 } else {
388 None
389 };
390 let bytes = response
391 .bytes()
392 .await
393 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
394
395 log::debug!(
396 "Vertex AI LLM response status={} body_len={}",
397 status,
398 bytes.len()
399 );
400
401 if status == StatusCode::TOO_MANY_REQUESTS {
402 return Ok(ChatOutcome::RateLimited(retry_after));
403 }
404
405 if status.is_server_error() {
406 let body = String::from_utf8_lossy(&bytes);
407 log::error!("Vertex AI server error status={status} body={body}");
408 return Ok(ChatOutcome::ServerError(body.into_owned()));
409 }
410
411 if status.is_client_error() {
412 let body = String::from_utf8_lossy(&bytes);
413 log::warn!("Vertex AI client error status={status} body={body}");
414 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
415 }
416
417 let api_response: ApiGenerateContentResponse = serde_json::from_slice(&bytes)
418 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
419
420 let candidate = api_response
421 .candidates
422 .into_iter()
423 .next()
424 .ok_or_else(|| anyhow::anyhow!("no candidates in response"))?;
425
426 let content = build_content_blocks(&candidate.content);
427
428 if content.is_empty() && !candidate.content.parts.is_empty() {
429 log::warn!(
430 "Vertex AI parts not converted to content blocks raw_parts={:?}",
431 candidate.content.parts
432 );
433 }
434
435 let has_tool_calls = content
436 .iter()
437 .any(|b| matches!(b, agent_sdk_foundation::llm::ContentBlock::ToolUse { .. }));
438
439 let stop_reason = candidate
440 .finish_reason
441 .as_ref()
442 .map(|r| map_finish_reason(r, has_tool_calls));
443
444 let usage = api_response
445 .usage_metadata
446 .unwrap_or(ApiUsageMetadata {
447 prompt: 0,
448 candidates: 0,
449 cached_content: 0,
450 })
451 .into_usage();
452
453 Ok(ChatOutcome::Success(ChatResponse {
454 id: String::new(),
455 content,
456 model: self.model.clone(),
457 stop_reason,
458 usage,
459 }))
460 }
461
462 fn chat_stream_gemini(&self, request: ChatRequest) -> StreamBox<'_> {
463 Box::pin(async_stream::stream! {
464 let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
465 Ok(thinking) => thinking,
466 Err(error) => {
467 yield Ok(StreamDelta::Error {
468 message: error.to_string(),
469 kind: StreamErrorKind::InvalidRequest,
470 });
471 return;
472 }
473 };
474 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
475 yield Ok(StreamDelta::Error {
476 message: error.to_string(),
477 kind: StreamErrorKind::InvalidRequest,
478 });
479 return;
480 }
481
482 let contents = build_api_contents(&request.messages);
483 let tools = request
484 .tools
485 .as_ref()
486 .map(|t| convert_tools_to_config(t.clone()));
487 let tool_config = request
488 .tool_choice
489 .as_ref()
490 .map(ApiFunctionCallingConfig::from_tool_choice);
491 let system_instruction = build_gemini_system_instruction(&request.system);
492 let thinking_config = thinking.as_ref().map(map_thinking_config);
493 let (response_mime_type, response_schema) =
494 gemini_response_format(request.response_format.as_ref());
495
496 let max_tokens = self.effective_max_tokens(&request);
497 let api_request = ApiGenerateContentRequest {
498 contents: &contents,
499 system_instruction: system_instruction.as_ref(),
500 tools: tools.as_ref().map(std::slice::from_ref),
501 tool_config,
502 generation_config: Some(ApiGenerationConfig {
503 max_output_tokens: Some(max_tokens),
504 thinking_config,
505 response_mime_type,
506 response_schema,
507 }),
508 cached_content: request.cached_content.as_deref(),
509 };
510
511 log::debug!(
512 "Vertex AI streaming LLM request model={} max_tokens={}",
513 self.model,
514 max_tokens
515 );
516
517 let url = format!("{}:streamGenerateContent?alt=sse", self.base_url("google"));
518
519 let response = match self.send_gemini_stream_request(&url, &api_request).await {
520 Ok(response) => response,
521 Err(item) => {
522 yield item;
523 return;
524 }
525 };
526
527 let mut inner = stream_gemini_response(response);
528 while let Some(item) = futures::StreamExt::next(&mut inner).await {
529 yield item;
530 }
531 })
532 }
533
534 async fn send_gemini_stream_request(
542 &self,
543 url: &str,
544 api_request: &ApiGenerateContentRequest<'_>,
545 ) -> Result<reqwest::Response, anyhow::Result<StreamDelta>> {
546 let response = match self
547 .client
548 .post(url)
549 .header("Content-Type", "application/json")
550 .bearer_auth(&self.access_token)
551 .json(api_request)
552 .send()
553 .await
554 {
555 Ok(response) => response,
556 Err(e) => return Err(Err(anyhow::anyhow!("request failed: {e}"))),
558 };
559
560 let status = response.status();
561 if !status.is_success() {
562 let body = response.text().await.unwrap_or_default();
563 let kind = if status == StatusCode::TOO_MANY_REQUESTS {
564 StreamErrorKind::RateLimited
565 } else if status.is_server_error() {
566 StreamErrorKind::ServerError
567 } else {
568 StreamErrorKind::InvalidRequest
569 };
570 log::warn!("Vertex AI error status={status} body={body}");
571 return Err(Ok(StreamDelta::Error {
572 message: body,
573 kind,
574 }));
575 }
576
577 Ok(response)
578 }
579}
580
581fn build_gemini_system_instruction(system: &str) -> Option<ApiContent> {
584 if system.is_empty() {
585 None
586 } else {
587 Some(ApiContent {
588 role: None,
589 parts: vec![ApiPart::Text {
590 text: system.to_owned(),
591 thought_signature: None,
592 }],
593 })
594 }
595}
596
597fn gemini_response_format(
600 response_format: Option<&ResponseFormat>,
601) -> (Option<&'static str>, Option<serde_json::Value>) {
602 response_format.map_or((None, None), |rf| {
603 (
604 Some("application/json"),
605 Some(gemini_response_schema(&rf.schema)),
606 )
607 })
608}
609
610impl VertexProvider {
615 async fn chat_claude(&self, request: ChatRequest) -> Result<ChatOutcome> {
616 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
617 Ok(thinking) => thinking,
618 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
619 };
620 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
621 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
622 }
623 let messages = Self::build_cached_vertex_claude_messages(&request);
624 let tools = build_vertex_claude_tools(&request);
625 let thinking = thinking_config
626 .as_ref()
627 .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
628 let output_config = thinking_config
629 .as_ref()
630 .and_then(|t| t.effort)
631 .map(|effort| anthropic_data::ApiOutputConfig { effort });
632 let system = Self::build_vertex_claude_system_prompt(&request.system);
633 let tool_choice = request
634 .tool_choice
635 .as_ref()
636 .map(anthropic_data::ApiToolChoice::from_tool_choice);
637
638 let max_tokens = self.effective_max_tokens(&request);
639 let api_request = anthropic_data::ApiMessagesRequest {
640 model: None, max_tokens,
642 system,
643 messages: &messages,
644 tools: tools.as_deref(),
645 tool_choice,
646 stream: false,
647 thinking,
648 output_config,
649 anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
650 };
651
652 log::debug!(
653 "Vertex AI (Claude) LLM request model={} max_tokens={}",
654 self.model,
655 max_tokens
656 );
657
658 if log::log_enabled!(log::Level::Debug) {
659 match serde_json::to_string_pretty(&api_request) {
660 Ok(json) => log::debug!("Vertex AI (Claude) request payload:\n{json}"),
661 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
662 }
663 }
664
665 let url = format!("{}:rawPredict", self.base_url("anthropic"));
666
667 let response = self
668 .client
669 .post(&url)
670 .header("Content-Type", "application/json")
671 .timeout(std::time::Duration::from_secs(CHAT_READ_TIMEOUT_SECS))
672 .bearer_auth(&self.access_token)
673 .json(&api_request)
674 .send()
675 .await
676 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
677
678 let status = response.status();
679 let retry_after = if status == StatusCode::TOO_MANY_REQUESTS {
681 crate::http::retry_after_from_headers(response.headers())
682 } else {
683 None
684 };
685 let bytes = response
686 .bytes()
687 .await
688 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
689
690 log::debug!(
691 "Vertex AI (Claude) response status={} body_len={}",
692 status,
693 bytes.len()
694 );
695
696 if status == StatusCode::TOO_MANY_REQUESTS {
697 return Ok(ChatOutcome::RateLimited(retry_after));
698 }
699
700 if status.is_server_error() {
701 let body = String::from_utf8_lossy(&bytes);
702 log::error!("Vertex AI (Claude) server error status={status} body={body}");
703 return Ok(ChatOutcome::ServerError(body.into_owned()));
704 }
705
706 if status.is_client_error() {
707 let body = String::from_utf8_lossy(&bytes);
708 log::warn!("Vertex AI (Claude) client error status={status} body={body}");
709 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
710 }
711
712 let api_response: anthropic_data::ApiResponse = serde_json::from_slice(&bytes)
713 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
714
715 log::debug!(
716 "Vertex AI (Claude) response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
717 api_response.id,
718 api_response.model,
719 api_response.stop_reason,
720 api_response.usage.total_input_tokens(),
721 api_response.usage.output,
722 api_response.content.len()
723 );
724
725 Ok(ChatOutcome::Success(Self::map_claude_response(
726 api_response,
727 )))
728 }
729
730 #[allow(clippy::too_many_lines)]
731 fn chat_stream_claude(&self, request: ChatRequest) -> StreamBox<'_> {
732 Box::pin(async_stream::stream! {
733 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
734 Ok(thinking) => thinking,
735 Err(error) => {
736 yield Ok(StreamDelta::Error {
737 message: error.to_string(),
738 kind: StreamErrorKind::InvalidRequest,
739 });
740 return;
741 }
742 };
743 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
744 yield Ok(StreamDelta::Error {
745 message: error.to_string(),
746 kind: StreamErrorKind::InvalidRequest,
747 });
748 return;
749 }
750 let messages = Self::build_cached_vertex_claude_messages(&request);
751 let tools = build_vertex_claude_tools(&request);
752 let thinking = thinking_config
753 .as_ref()
754 .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
755 let output_config = thinking_config
756 .as_ref()
757 .and_then(|t| t.effort)
758 .map(|effort| anthropic_data::ApiOutputConfig { effort });
759 let system = Self::build_vertex_claude_system_prompt(&request.system);
760 let tool_choice = request
761 .tool_choice
762 .as_ref()
763 .map(anthropic_data::ApiToolChoice::from_tool_choice);
764
765 let max_tokens = self.effective_max_tokens(&request);
766 let api_request = anthropic_data::ApiMessagesRequest {
767 model: None, max_tokens,
769 system,
770 messages: &messages,
771 tools: tools.as_deref(),
772 tool_choice,
773 stream: true,
774 thinking,
775 output_config,
776 anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
777 };
778
779 log::debug!(
780 "Vertex AI (Claude) streaming request model={} max_tokens={}",
781 self.model,
782 max_tokens
783 );
784
785 if log::log_enabled!(log::Level::Debug) {
786 match serde_json::to_string_pretty(&api_request) {
787 Ok(json) => log::debug!("Vertex AI (Claude) streaming request payload:\n{json}"),
788 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
789 }
790 }
791
792 let url = format!("{}:streamRawPredict", self.base_url("anthropic"));
793
794 let response = match self
795 .client
796 .post(&url)
797 .header("Content-Type", "application/json")
798 .bearer_auth(&self.access_token)
799 .json(&api_request)
800 .send()
801 .await
802 {
803 Ok(r) => r,
804 Err(e) => {
805 yield Err(anyhow::anyhow!("request failed: {e}"));
806 return;
807 }
808 };
809
810 let status = response.status();
811
812 if status == StatusCode::TOO_MANY_REQUESTS {
813 yield Ok(StreamDelta::Error {
814 message: "Rate limited".to_string(),
815 kind: StreamErrorKind::RateLimited,
816 });
817 return;
818 }
819
820 if status.is_server_error() {
821 let body = response.text().await.unwrap_or_default();
822 log::error!("Vertex AI (Claude) server error status={status} body={body}");
823 yield Ok(StreamDelta::Error {
824 message: body,
825 kind: StreamErrorKind::ServerError,
826 });
827 return;
828 }
829
830 if status.is_client_error() {
831 let body = response.text().await.unwrap_or_default();
832 log::warn!("Vertex AI (Claude) client error status={status} body={body}");
833 yield Ok(StreamDelta::Error {
834 message: body,
835 kind: StreamErrorKind::InvalidRequest,
836 });
837 return;
838 }
839
840 let mut stream = response.bytes_stream();
842 let mut buffer = String::new();
843 let mut input_tokens: u32 = 0;
844 let mut output_tokens: u32 = 0;
845 let mut cached_input_tokens: u32 = 0;
846 let mut cache_creation_input_tokens: u32 = 0;
847 let mut tool_ids: std::collections::HashMap<usize, String> =
848 std::collections::HashMap::new();
849 let mut received_message_stop = false;
850 let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
851
852 while let Some(chunk_result) = stream.next().await {
853 let chunk = match chunk_result {
854 Ok(c) => c,
855 Err(e) => {
856 yield Err(anyhow::anyhow!("stream error: {e}"));
858 return;
859 }
860 };
861
862 buffer.push_str(&String::from_utf8_lossy(&chunk));
863
864 while let Some(event_block) = anthropic_data::take_next_sse_event(&mut buffer) {
866 if anthropic_data::is_message_stop_event(&event_block) {
867 received_message_stop = true;
868 }
869
870 if let Some(delta) = anthropic_data::parse_sse_event(
871 &event_block,
872 &mut input_tokens,
873 &mut output_tokens,
874 &mut cached_input_tokens,
875 &mut cache_creation_input_tokens,
876 &mut tool_ids,
877 &mut pending_stop_reason,
878 ) {
879 yield Ok(delta);
880 }
881 if anthropic_data::is_message_stop_event(&event_block) {
882 yield Ok(StreamDelta::Done {
883 stop_reason: pending_stop_reason.take(),
884 });
885 }
886 }
887 }
888
889 let remaining = buffer.trim();
891 if !remaining.is_empty() {
892 if anthropic_data::is_message_stop_event(remaining) {
893 received_message_stop = true;
894 }
895
896 if let Some(delta) = anthropic_data::parse_sse_event(
897 remaining,
898 &mut input_tokens,
899 &mut output_tokens,
900 &mut cached_input_tokens,
901 &mut cache_creation_input_tokens,
902 &mut tool_ids,
903 &mut pending_stop_reason,
904 ) {
905 yield Ok(delta);
906 }
907 if anthropic_data::is_message_stop_event(remaining) {
908 yield Ok(StreamDelta::Done {
909 stop_reason: pending_stop_reason.take(),
910 });
911 }
912 }
913
914 if !received_message_stop {
915 log::warn!(
916 "Vertex AI (Claude) SSE stream ended without message_stop"
917 );
918 yield Ok(StreamDelta::Error {
919 message: "Stream ended unexpectedly without completion".to_string(),
920 kind: StreamErrorKind::ServerError,
921 });
922 }
923 })
924 }
925}
926
927#[cfg(test)]
928mod tests {
929 use super::*;
930
931 #[test]
932 fn test_new_creates_provider() {
933 let provider = VertexProvider::new(
934 "token".to_string(),
935 "my-project".to_string(),
936 "us-central1".to_string(),
937 "custom-model".to_string(),
938 );
939
940 assert_eq!(provider.model(), "custom-model");
941 assert_eq!(provider.provider(), "vertex");
942 }
943
944 #[test]
945 fn test_flash_factory() {
946 let provider = VertexProvider::flash(
947 "token".to_string(),
948 "my-project".to_string(),
949 "us-central1".to_string(),
950 );
951
952 assert_eq!(provider.model(), MODEL_GEMINI_3_FLASH);
953 assert_eq!(provider.provider(), "vertex");
954 }
955
956 #[test]
957 fn test_pro_factory() {
958 let provider = VertexProvider::pro(
959 "token".to_string(),
960 "my-project".to_string(),
961 "us-central1".to_string(),
962 );
963
964 assert_eq!(provider.model(), MODEL_GEMINI_31_PRO);
965 assert_eq!(provider.provider(), "vertex");
966 }
967
968 #[test]
969 fn test_provider_is_cloneable() {
970 let provider = VertexProvider::new(
971 "token".to_string(),
972 "my-project".to_string(),
973 "us-central1".to_string(),
974 "test-model".to_string(),
975 );
976 let cloned = provider.clone();
977
978 assert_eq!(provider.model(), cloned.model());
979 assert_eq!(provider.provider(), cloned.provider());
980 }
981
982 #[test]
983 fn test_is_claude_model() {
984 let claude_provider = VertexProvider::new(
985 "token".to_string(),
986 "project".to_string(),
987 "us-central1".to_string(),
988 "claude-sonnet-4-20250514".to_string(),
989 );
990 assert!(claude_provider.is_claude_model());
991
992 let gemini_provider = VertexProvider::new(
993 "token".to_string(),
994 "project".to_string(),
995 "us-central1".to_string(),
996 "gemini-3-flash-preview".to_string(),
997 );
998 assert!(!gemini_provider.is_claude_model());
999 }
1000
1001 #[test]
1002 fn test_base_url_gemini() {
1003 let provider = VertexProvider::new(
1004 "token".to_string(),
1005 "my-project".to_string(),
1006 "us-central1".to_string(),
1007 "gemini-3-flash-preview".to_string(),
1008 );
1009
1010 let url = provider.base_url("google");
1011 assert_eq!(
1012 url,
1013 "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google/models/gemini-3-flash-preview"
1014 );
1015 }
1016
1017 #[test]
1018 fn test_base_url_claude() {
1019 let provider = VertexProvider::new(
1020 "token".to_string(),
1021 "my-project".to_string(),
1022 "us-central1".to_string(),
1023 "claude-sonnet-4-20250514".to_string(),
1024 );
1025
1026 let url = provider.base_url("anthropic");
1027 assert_eq!(
1028 url,
1029 "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/anthropic/models/claude-sonnet-4-20250514"
1030 );
1031 }
1032
1033 #[test]
1034 fn test_base_url_with_different_region() {
1035 let provider = VertexProvider::new(
1036 "token".to_string(),
1037 "other-project".to_string(),
1038 "europe-west4".to_string(),
1039 "gemini-3.1-pro-preview".to_string(),
1040 );
1041
1042 let url = provider.base_url("google");
1043 assert!(url.starts_with("https://europe-west4-aiplatform.googleapis.com/"));
1044 assert!(url.contains("/projects/other-project/"));
1045 assert!(url.contains("/locations/europe-west4/"));
1046 assert!(url.ends_with("/models/gemini-3.1-pro-preview"));
1047 }
1048
1049 #[test]
1050 fn test_base_url_global_region_has_no_prefix() {
1051 let provider = VertexProvider::new(
1052 "token".to_string(),
1053 "my-project".to_string(),
1054 "global".to_string(),
1055 "gemini-3.1-pro-preview".to_string(),
1056 );
1057
1058 let url = provider.base_url("google");
1059 assert_eq!(
1060 url,
1061 "https://aiplatform.googleapis.com/v1/projects/my-project/locations/global/publishers/google/models/gemini-3.1-pro-preview"
1062 );
1063 }
1064
1065 #[test]
1066 fn test_vertex_claude_46_rejects_budgeted_thinking() {
1067 let provider = VertexProvider::new(
1068 "token".to_string(),
1069 "project".to_string(),
1070 "global".to_string(),
1071 MODEL_SONNET_46.to_string(),
1072 );
1073
1074 let error = provider
1075 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1076 .unwrap_err();
1077 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1078 }
1079
1080 #[test]
1081 fn test_vertex_claude_opus_47_rejects_budgeted_thinking() {
1082 let provider = VertexProvider::new(
1083 "token".to_string(),
1084 "project".to_string(),
1085 "global".to_string(),
1086 MODEL_OPUS_47.to_string(),
1087 );
1088
1089 let error = provider
1090 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1091 .unwrap_err();
1092 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1093 }
1094
1095 #[test]
1096 fn test_vertex_claude_opus_48_rejects_budgeted_thinking() {
1097 let provider = VertexProvider::new(
1098 "token".to_string(),
1099 "project".to_string(),
1100 "global".to_string(),
1101 MODEL_OPUS_48.to_string(),
1102 );
1103
1104 let error = provider
1105 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1106 .unwrap_err();
1107 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1108 }
1109
1110 #[test]
1111 fn test_vertex_claude_fable_5_rejects_budgeted_thinking() {
1112 let provider = VertexProvider::new(
1113 "token".to_string(),
1114 "project".to_string(),
1115 "global".to_string(),
1116 MODEL_FABLE_5.to_string(),
1117 );
1118
1119 let error = provider
1120 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1121 .unwrap_err();
1122 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1123 }
1124
1125 #[test]
1126 fn test_model_constants() {
1127 assert_eq!(MODEL_GEMINI_3_FLASH, "gemini-3-flash-preview");
1128 assert_eq!(MODEL_GEMINI_31_PRO, "gemini-3.1-pro-preview");
1129 assert_eq!(MODEL_GEMINI_3_PRO, "gemini-3.0-pro");
1130 }
1131
1132 fn request_with_max_tokens(max_tokens: u32, explicit: bool) -> ChatRequest {
1133 ChatRequest {
1134 system: String::new(),
1135 messages: vec![agent_sdk_foundation::llm::Message::user("hi")],
1136 tools: None,
1137 max_tokens,
1138 max_tokens_explicit: explicit,
1139 session_id: None,
1140 cached_content: None,
1141 thinking: None,
1142 tool_choice: None,
1143 response_format: None,
1144 cache: None,
1145 }
1146 }
1147
1148 #[test]
1149 fn test_effective_max_tokens_honors_explicit_budget() {
1150 let provider = VertexProvider::new(
1151 "token".to_string(),
1152 "project".to_string(),
1153 "global".to_string(),
1154 MODEL_SONNET_46.to_string(),
1155 );
1156 let request = request_with_max_tokens(1234, true);
1157 assert_eq!(provider.effective_max_tokens(&request), 1234);
1158 }
1159
1160 #[test]
1161 fn test_effective_max_tokens_uses_clamped_default_when_implicit() {
1162 let provider = VertexProvider::new(
1166 "token".to_string(),
1167 "project".to_string(),
1168 "global".to_string(),
1169 MODEL_SONNET_46.to_string(),
1170 );
1171 let request = request_with_max_tokens(4096, false);
1172 let effective = provider.effective_max_tokens(&request);
1173 assert_eq!(effective, provider.default_max_tokens());
1174 assert!(effective <= DEFAULT_SAFE_MAX_OUTPUT_TOKENS);
1175 }
1176}