1use crate::attachments::validate_request_attachments;
12use crate::impls::anthropic::{
13 MODEL_FABLE_5, MODEL_OPUS_46, MODEL_OPUS_47, MODEL_OPUS_48, MODEL_SONNET_46,
14 data as anthropic_data,
15};
16use crate::impls::gemini::data::{
17 ApiContent, ApiFunctionCallingConfig, ApiGenerateContentRequest, ApiGenerateContentResponse,
18 ApiGenerationConfig, ApiPart, ApiUsageMetadata, build_api_contents, build_content_blocks,
19 convert_tools_to_config, gemini_response_schema, map_finish_reason, map_thinking_config,
20 stream_gemini_response,
21};
22use crate::provider::LlmProvider;
23use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
24use agent_sdk_foundation::llm::{
25 ChatOutcome, ChatRequest, ChatResponse, ResponseFormat, ThinkingConfig, ThinkingMode, Usage,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::StreamExt;
30use reqwest::StatusCode;
31
32pub const MODEL_GEMINI_3_FLASH: &str = "gemini-3-flash-preview";
33pub const MODEL_GEMINI_31_PRO: &str = "gemini-3.1-pro-preview";
34
35pub const MODEL_GEMINI_3_PRO: &str = "gemini-3.0-pro";
37
38const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
40const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
41
42const CONNECT_TIMEOUT_SECS: u64 = 30;
44const TCP_KEEPALIVE_SECS: u64 = 30;
46const CHAT_READ_TIMEOUT_SECS: u64 = 300;
50
51const fn vertex_cache_control() -> anthropic_data::ApiCacheControl {
52 anthropic_data::ApiCacheControl::ephemeral()
53}
54
55#[derive(Clone)]
64pub struct VertexProvider {
65 client: reqwest::Client,
66 access_token: String,
67 project_id: String,
68 region: String,
69 model: String,
70 thinking: Option<ThinkingConfig>,
71}
72
73impl VertexProvider {
74 #[must_use]
76 pub fn new(access_token: String, project_id: String, region: String, model: String) -> Self {
77 let client = reqwest::Client::builder()
78 .connect_timeout(std::time::Duration::from_secs(CONNECT_TIMEOUT_SECS))
79 .tcp_keepalive(std::time::Duration::from_secs(TCP_KEEPALIVE_SECS))
80 .build()
81 .unwrap_or_else(|error| {
82 log::warn!(
83 "failed to build Vertex HTTP client with timeouts ({error}); using default client"
84 );
85 reqwest::Client::new()
86 });
87
88 Self {
89 client,
90 access_token,
91 project_id,
92 region,
93 model,
94 thinking: None,
95 }
96 }
97
98 #[must_use]
100 pub fn flash(access_token: String, project_id: String, region: String) -> Self {
101 Self::new(
102 access_token,
103 project_id,
104 region,
105 MODEL_GEMINI_3_FLASH.to_owned(),
106 )
107 }
108
109 #[must_use]
111 pub fn pro(access_token: String, project_id: String, region: String) -> Self {
112 Self::new(
113 access_token,
114 project_id,
115 region,
116 MODEL_GEMINI_31_PRO.to_owned(),
117 )
118 }
119
120 fn is_claude_model(&self) -> bool {
122 self.model.starts_with("claude-")
123 }
124
125 fn base_url(&self, publisher: &str) -> String {
130 let domain = if self.region == "global" {
131 "aiplatform.googleapis.com".to_owned()
132 } else {
133 format!("{}-aiplatform.googleapis.com", self.region)
134 };
135 format!(
136 "https://{domain}/v1/projects/{project}/locations/{region}/publishers/{publisher}/models/{model}",
137 domain = domain,
138 region = self.region,
139 project = self.project_id,
140 publisher = publisher,
141 model = self.model,
142 )
143 }
144
145 #[must_use]
147 pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
148 self.thinking = Some(thinking);
149 self
150 }
151
152 fn requires_anthropic_adaptive_thinking(&self) -> bool {
153 matches!(
154 self.model.as_str(),
155 MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48 | MODEL_FABLE_5
156 )
157 }
158
159 fn build_cached_vertex_claude_messages(
160 request: &ChatRequest,
161 ) -> Vec<anthropic_data::ApiMessage> {
162 let mut messages = anthropic_data::build_api_messages(request);
163 anthropic_data::apply_cache_control_to_last_user_message(
164 &mut messages,
165 vertex_cache_control(),
166 );
167 messages
168 }
169
170 fn build_vertex_claude_system_prompt(
171 system: &str,
172 ) -> Option<anthropic_data::ApiSystemPrompt<'_>> {
173 anthropic_data::build_api_system_prompt(system, Some(vertex_cache_control()))
174 }
175
176 fn effective_max_tokens(&self, request: &ChatRequest) -> u32 {
183 if request.max_tokens_explicit {
184 request.max_tokens
185 } else {
186 self.default_max_tokens()
187 }
188 }
189
190 fn map_claude_response(api_response: anthropic_data::ApiResponse) -> ChatResponse {
191 let content = anthropic_data::map_content_blocks(api_response.content);
192 let stop_reason = api_response
193 .stop_reason
194 .as_ref()
195 .map(anthropic_data::map_stop_reason);
196
197 ChatResponse {
198 id: api_response.id,
199 content,
200 model: api_response.model,
201 stop_reason,
202 usage: Usage {
203 input_tokens: api_response.usage.total_input_tokens(),
204 output_tokens: api_response.usage.output,
205 cached_input_tokens: api_response.usage.cached_input_tokens(),
206 cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
207 },
208 }
209 }
210}
211
212#[async_trait]
213impl LlmProvider for VertexProvider {
214 async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
215 if self.is_claude_model() {
216 return self.chat_claude(request).await;
217 }
218 self.chat_gemini(request).await
219 }
220
221 fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
222 if self.is_claude_model() {
223 return self.chat_stream_claude(request);
224 }
225 self.chat_stream_gemini(request)
226 }
227
228 fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
229 let Some(thinking) = thinking else {
230 return Ok(());
231 };
232
233 if self
234 .capabilities()
235 .is_some_and(|caps| !caps.supports_thinking)
236 {
237 return Err(anyhow::anyhow!(
238 "thinking is not supported for provider={} model={}",
239 self.provider(),
240 self.model()
241 ));
242 }
243
244 if matches!(thinking.mode, ThinkingMode::Adaptive)
245 && !self
246 .capabilities()
247 .is_some_and(|caps| caps.supports_adaptive_thinking)
248 {
249 return Err(anyhow::anyhow!(
250 "adaptive thinking is not supported for provider={} model={}",
251 self.provider(),
252 self.model()
253 ));
254 }
255
256 if self.is_claude_model()
257 && self.requires_anthropic_adaptive_thinking()
258 && matches!(thinking.mode, ThinkingMode::Enabled { .. })
259 {
260 return Err(anyhow::anyhow!(
261 "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
262 self.provider(),
263 self.model()
264 ));
265 }
266
267 Ok(())
268 }
269
270 fn model(&self) -> &str {
271 &self.model
272 }
273
274 fn provider(&self) -> &'static str {
275 "vertex"
276 }
277
278 fn configured_thinking(&self) -> Option<&ThinkingConfig> {
279 self.thinking.as_ref()
280 }
281
282 fn default_max_tokens(&self) -> u32 {
283 let provider = if self.is_claude_model() {
284 "anthropic"
285 } else {
286 "gemini"
287 };
288 let model_max = self
289 .capabilities()
290 .and_then(|caps| caps.max_output_tokens)
291 .or_else(|| {
292 crate::model_capabilities::default_max_output_tokens(provider, self.model())
293 })
294 .unwrap_or(4096);
295 model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
296 }
297}
298
299impl VertexProvider {
304 #[allow(clippy::too_many_lines)]
305 async fn chat_gemini(&self, request: ChatRequest) -> Result<ChatOutcome> {
306 let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
307 Ok(thinking) => thinking,
308 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
309 };
310 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
311 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
312 }
313 let contents = build_api_contents(&request.messages);
314 let tools = request
315 .tools
316 .as_ref()
317 .map(|t| convert_tools_to_config(t.clone()));
318 let tool_config = request
319 .tool_choice
320 .as_ref()
321 .map(ApiFunctionCallingConfig::from_tool_choice);
322 let system_instruction = if request.system.is_empty() {
323 None
324 } else {
325 Some(ApiContent {
326 role: None,
327 parts: vec![ApiPart::Text {
328 text: request.system.clone(),
329 thought_signature: None,
330 }],
331 })
332 };
333
334 let thinking_config = thinking.as_ref().map(map_thinking_config);
335 let (response_mime_type, response_schema) =
336 request.response_format.as_ref().map_or((None, None), |rf| {
337 (
338 Some("application/json"),
339 Some(gemini_response_schema(&rf.schema)),
340 )
341 });
342
343 let max_tokens = self.effective_max_tokens(&request);
344 let api_request = ApiGenerateContentRequest {
345 contents: &contents,
346 system_instruction: system_instruction.as_ref(),
347 tools: tools.as_ref().map(std::slice::from_ref),
348 tool_config,
349 generation_config: Some(ApiGenerationConfig {
350 max_output_tokens: Some(max_tokens),
351 thinking_config,
352 response_mime_type,
353 response_schema,
354 }),
355 cached_content: request.cached_content.as_deref(),
356 };
357
358 log::debug!(
359 "Vertex AI LLM request model={} max_tokens={}",
360 self.model,
361 max_tokens
362 );
363
364 let url = format!("{}:generateContent", self.base_url("google"));
365
366 let response = self
367 .client
368 .post(&url)
369 .header("Content-Type", "application/json")
370 .timeout(std::time::Duration::from_secs(CHAT_READ_TIMEOUT_SECS))
371 .bearer_auth(&self.access_token)
372 .json(&api_request)
373 .send()
374 .await
375 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
376
377 let status = response.status();
378 let bytes = response
379 .bytes()
380 .await
381 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
382
383 log::debug!(
384 "Vertex AI LLM response status={} body_len={}",
385 status,
386 bytes.len()
387 );
388
389 if status == StatusCode::TOO_MANY_REQUESTS {
390 return Ok(ChatOutcome::RateLimited);
391 }
392
393 if status.is_server_error() {
394 let body = String::from_utf8_lossy(&bytes);
395 log::error!("Vertex AI server error status={status} body={body}");
396 return Ok(ChatOutcome::ServerError(body.into_owned()));
397 }
398
399 if status.is_client_error() {
400 let body = String::from_utf8_lossy(&bytes);
401 log::warn!("Vertex AI client error status={status} body={body}");
402 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
403 }
404
405 let api_response: ApiGenerateContentResponse = serde_json::from_slice(&bytes)
406 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
407
408 let candidate = api_response
409 .candidates
410 .into_iter()
411 .next()
412 .ok_or_else(|| anyhow::anyhow!("no candidates in response"))?;
413
414 let content = build_content_blocks(&candidate.content);
415
416 if content.is_empty() && !candidate.content.parts.is_empty() {
417 log::warn!(
418 "Vertex AI parts not converted to content blocks raw_parts={:?}",
419 candidate.content.parts
420 );
421 }
422
423 let has_tool_calls = content
424 .iter()
425 .any(|b| matches!(b, agent_sdk_foundation::llm::ContentBlock::ToolUse { .. }));
426
427 let stop_reason = candidate
428 .finish_reason
429 .as_ref()
430 .map(|r| map_finish_reason(r, has_tool_calls));
431
432 let usage = api_response
433 .usage_metadata
434 .unwrap_or(ApiUsageMetadata {
435 prompt: 0,
436 candidates: 0,
437 cached_content: 0,
438 })
439 .into_usage();
440
441 Ok(ChatOutcome::Success(ChatResponse {
442 id: String::new(),
443 content,
444 model: self.model.clone(),
445 stop_reason,
446 usage,
447 }))
448 }
449
450 fn chat_stream_gemini(&self, request: ChatRequest) -> StreamBox<'_> {
451 Box::pin(async_stream::stream! {
452 let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
453 Ok(thinking) => thinking,
454 Err(error) => {
455 yield Ok(StreamDelta::Error {
456 message: error.to_string(),
457 kind: StreamErrorKind::InvalidRequest,
458 });
459 return;
460 }
461 };
462 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
463 yield Ok(StreamDelta::Error {
464 message: error.to_string(),
465 kind: StreamErrorKind::InvalidRequest,
466 });
467 return;
468 }
469
470 let contents = build_api_contents(&request.messages);
471 let tools = request
472 .tools
473 .as_ref()
474 .map(|t| convert_tools_to_config(t.clone()));
475 let tool_config = request
476 .tool_choice
477 .as_ref()
478 .map(ApiFunctionCallingConfig::from_tool_choice);
479 let system_instruction = build_gemini_system_instruction(&request.system);
480 let thinking_config = thinking.as_ref().map(map_thinking_config);
481 let (response_mime_type, response_schema) =
482 gemini_response_format(request.response_format.as_ref());
483
484 let max_tokens = self.effective_max_tokens(&request);
485 let api_request = ApiGenerateContentRequest {
486 contents: &contents,
487 system_instruction: system_instruction.as_ref(),
488 tools: tools.as_ref().map(std::slice::from_ref),
489 tool_config,
490 generation_config: Some(ApiGenerationConfig {
491 max_output_tokens: Some(max_tokens),
492 thinking_config,
493 response_mime_type,
494 response_schema,
495 }),
496 cached_content: request.cached_content.as_deref(),
497 };
498
499 log::debug!(
500 "Vertex AI streaming LLM request model={} max_tokens={}",
501 self.model,
502 max_tokens
503 );
504
505 let url = format!("{}:streamGenerateContent?alt=sse", self.base_url("google"));
506
507 let response = match self.send_gemini_stream_request(&url, &api_request).await {
508 Ok(response) => response,
509 Err(item) => {
510 yield item;
511 return;
512 }
513 };
514
515 let mut inner = stream_gemini_response(response);
516 while let Some(item) = futures::StreamExt::next(&mut inner).await {
517 yield item;
518 }
519 })
520 }
521
522 async fn send_gemini_stream_request(
530 &self,
531 url: &str,
532 api_request: &ApiGenerateContentRequest<'_>,
533 ) -> Result<reqwest::Response, anyhow::Result<StreamDelta>> {
534 let response = match self
535 .client
536 .post(url)
537 .header("Content-Type", "application/json")
538 .bearer_auth(&self.access_token)
539 .json(api_request)
540 .send()
541 .await
542 {
543 Ok(response) => response,
544 Err(e) => return Err(Err(anyhow::anyhow!("request failed: {e}"))),
546 };
547
548 let status = response.status();
549 if !status.is_success() {
550 let body = response.text().await.unwrap_or_default();
551 let kind = if status == StatusCode::TOO_MANY_REQUESTS {
552 StreamErrorKind::RateLimited
553 } else if status.is_server_error() {
554 StreamErrorKind::ServerError
555 } else {
556 StreamErrorKind::InvalidRequest
557 };
558 log::warn!("Vertex AI error status={status} body={body}");
559 return Err(Ok(StreamDelta::Error {
560 message: body,
561 kind,
562 }));
563 }
564
565 Ok(response)
566 }
567}
568
569fn build_gemini_system_instruction(system: &str) -> Option<ApiContent> {
572 if system.is_empty() {
573 None
574 } else {
575 Some(ApiContent {
576 role: None,
577 parts: vec![ApiPart::Text {
578 text: system.to_owned(),
579 thought_signature: None,
580 }],
581 })
582 }
583}
584
585fn gemini_response_format(
588 response_format: Option<&ResponseFormat>,
589) -> (Option<&'static str>, Option<serde_json::Value>) {
590 response_format.map_or((None, None), |rf| {
591 (
592 Some("application/json"),
593 Some(gemini_response_schema(&rf.schema)),
594 )
595 })
596}
597
598impl VertexProvider {
603 async fn chat_claude(&self, request: ChatRequest) -> Result<ChatOutcome> {
604 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
605 Ok(thinking) => thinking,
606 Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
607 };
608 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
609 return Ok(ChatOutcome::InvalidRequest(error.to_string()));
610 }
611 let messages = Self::build_cached_vertex_claude_messages(&request);
612 let tools = anthropic_data::build_api_tools(&request);
613 let thinking = thinking_config
614 .as_ref()
615 .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
616 let output_config = thinking_config
617 .as_ref()
618 .and_then(|t| t.effort)
619 .map(|effort| anthropic_data::ApiOutputConfig { effort });
620 let system = Self::build_vertex_claude_system_prompt(&request.system);
621 let tool_choice = request
622 .tool_choice
623 .as_ref()
624 .map(anthropic_data::ApiToolChoice::from_tool_choice);
625
626 let max_tokens = self.effective_max_tokens(&request);
627 let api_request = anthropic_data::ApiMessagesRequest {
628 model: None, max_tokens,
630 system,
631 messages: &messages,
632 tools: tools.as_deref(),
633 tool_choice,
634 stream: false,
635 thinking,
636 output_config,
637 anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
638 };
639
640 log::debug!(
641 "Vertex AI (Claude) LLM request model={} max_tokens={}",
642 self.model,
643 max_tokens
644 );
645
646 if log::log_enabled!(log::Level::Debug) {
647 match serde_json::to_string_pretty(&api_request) {
648 Ok(json) => log::debug!("Vertex AI (Claude) request payload:\n{json}"),
649 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
650 }
651 }
652
653 let url = format!("{}:rawPredict", self.base_url("anthropic"));
654
655 let response = self
656 .client
657 .post(&url)
658 .header("Content-Type", "application/json")
659 .timeout(std::time::Duration::from_secs(CHAT_READ_TIMEOUT_SECS))
660 .bearer_auth(&self.access_token)
661 .json(&api_request)
662 .send()
663 .await
664 .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
665
666 let status = response.status();
667 let bytes = response
668 .bytes()
669 .await
670 .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
671
672 log::debug!(
673 "Vertex AI (Claude) response status={} body_len={}",
674 status,
675 bytes.len()
676 );
677
678 if status == StatusCode::TOO_MANY_REQUESTS {
679 return Ok(ChatOutcome::RateLimited);
680 }
681
682 if status.is_server_error() {
683 let body = String::from_utf8_lossy(&bytes);
684 log::error!("Vertex AI (Claude) server error status={status} body={body}");
685 return Ok(ChatOutcome::ServerError(body.into_owned()));
686 }
687
688 if status.is_client_error() {
689 let body = String::from_utf8_lossy(&bytes);
690 log::warn!("Vertex AI (Claude) client error status={status} body={body}");
691 return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
692 }
693
694 let api_response: anthropic_data::ApiResponse = serde_json::from_slice(&bytes)
695 .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
696
697 log::debug!(
698 "Vertex AI (Claude) response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
699 api_response.id,
700 api_response.model,
701 api_response.stop_reason,
702 api_response.usage.total_input_tokens(),
703 api_response.usage.output,
704 api_response.content.len()
705 );
706
707 Ok(ChatOutcome::Success(Self::map_claude_response(
708 api_response,
709 )))
710 }
711
712 #[allow(clippy::too_many_lines)]
713 fn chat_stream_claude(&self, request: ChatRequest) -> StreamBox<'_> {
714 Box::pin(async_stream::stream! {
715 let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
716 Ok(thinking) => thinking,
717 Err(error) => {
718 yield Ok(StreamDelta::Error {
719 message: error.to_string(),
720 kind: StreamErrorKind::InvalidRequest,
721 });
722 return;
723 }
724 };
725 if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
726 yield Ok(StreamDelta::Error {
727 message: error.to_string(),
728 kind: StreamErrorKind::InvalidRequest,
729 });
730 return;
731 }
732 let messages = Self::build_cached_vertex_claude_messages(&request);
733 let tools = anthropic_data::build_api_tools(&request);
734 let thinking = thinking_config
735 .as_ref()
736 .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
737 let output_config = thinking_config
738 .as_ref()
739 .and_then(|t| t.effort)
740 .map(|effort| anthropic_data::ApiOutputConfig { effort });
741 let system = Self::build_vertex_claude_system_prompt(&request.system);
742 let tool_choice = request
743 .tool_choice
744 .as_ref()
745 .map(anthropic_data::ApiToolChoice::from_tool_choice);
746
747 let max_tokens = self.effective_max_tokens(&request);
748 let api_request = anthropic_data::ApiMessagesRequest {
749 model: None, max_tokens,
751 system,
752 messages: &messages,
753 tools: tools.as_deref(),
754 tool_choice,
755 stream: true,
756 thinking,
757 output_config,
758 anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
759 };
760
761 log::debug!(
762 "Vertex AI (Claude) streaming request model={} max_tokens={}",
763 self.model,
764 max_tokens
765 );
766
767 if log::log_enabled!(log::Level::Debug) {
768 match serde_json::to_string_pretty(&api_request) {
769 Ok(json) => log::debug!("Vertex AI (Claude) streaming request payload:\n{json}"),
770 Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
771 }
772 }
773
774 let url = format!("{}:streamRawPredict", self.base_url("anthropic"));
775
776 let response = match self
777 .client
778 .post(&url)
779 .header("Content-Type", "application/json")
780 .bearer_auth(&self.access_token)
781 .json(&api_request)
782 .send()
783 .await
784 {
785 Ok(r) => r,
786 Err(e) => {
787 yield Err(anyhow::anyhow!("request failed: {e}"));
788 return;
789 }
790 };
791
792 let status = response.status();
793
794 if status == StatusCode::TOO_MANY_REQUESTS {
795 yield Ok(StreamDelta::Error {
796 message: "Rate limited".to_string(),
797 kind: StreamErrorKind::RateLimited,
798 });
799 return;
800 }
801
802 if status.is_server_error() {
803 let body = response.text().await.unwrap_or_default();
804 log::error!("Vertex AI (Claude) server error status={status} body={body}");
805 yield Ok(StreamDelta::Error {
806 message: body,
807 kind: StreamErrorKind::ServerError,
808 });
809 return;
810 }
811
812 if status.is_client_error() {
813 let body = response.text().await.unwrap_or_default();
814 log::warn!("Vertex AI (Claude) client error status={status} body={body}");
815 yield Ok(StreamDelta::Error {
816 message: body,
817 kind: StreamErrorKind::InvalidRequest,
818 });
819 return;
820 }
821
822 let mut stream = response.bytes_stream();
824 let mut buffer = String::new();
825 let mut input_tokens: u32 = 0;
826 let mut output_tokens: u32 = 0;
827 let mut cached_input_tokens: u32 = 0;
828 let mut cache_creation_input_tokens: u32 = 0;
829 let mut tool_ids: std::collections::HashMap<usize, String> =
830 std::collections::HashMap::new();
831 let mut received_message_stop = false;
832 let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
833
834 while let Some(chunk_result) = stream.next().await {
835 let chunk = match chunk_result {
836 Ok(c) => c,
837 Err(e) => {
838 yield Err(anyhow::anyhow!("stream error: {e}"));
840 return;
841 }
842 };
843
844 buffer.push_str(&String::from_utf8_lossy(&chunk));
845
846 while let Some(event_block) = anthropic_data::take_next_sse_event(&mut buffer) {
848 if anthropic_data::is_message_stop_event(&event_block) {
849 received_message_stop = true;
850 }
851
852 if let Some(delta) = anthropic_data::parse_sse_event(
853 &event_block,
854 &mut input_tokens,
855 &mut output_tokens,
856 &mut cached_input_tokens,
857 &mut cache_creation_input_tokens,
858 &mut tool_ids,
859 &mut pending_stop_reason,
860 ) {
861 yield Ok(delta);
862 }
863 if anthropic_data::is_message_stop_event(&event_block) {
864 yield Ok(StreamDelta::Done {
865 stop_reason: pending_stop_reason.take(),
866 });
867 }
868 }
869 }
870
871 let remaining = buffer.trim();
873 if !remaining.is_empty() {
874 if anthropic_data::is_message_stop_event(remaining) {
875 received_message_stop = true;
876 }
877
878 if let Some(delta) = anthropic_data::parse_sse_event(
879 remaining,
880 &mut input_tokens,
881 &mut output_tokens,
882 &mut cached_input_tokens,
883 &mut cache_creation_input_tokens,
884 &mut tool_ids,
885 &mut pending_stop_reason,
886 ) {
887 yield Ok(delta);
888 }
889 if anthropic_data::is_message_stop_event(remaining) {
890 yield Ok(StreamDelta::Done {
891 stop_reason: pending_stop_reason.take(),
892 });
893 }
894 }
895
896 if !received_message_stop {
897 log::warn!(
898 "Vertex AI (Claude) SSE stream ended without message_stop"
899 );
900 yield Ok(StreamDelta::Error {
901 message: "Stream ended unexpectedly without completion".to_string(),
902 kind: StreamErrorKind::ServerError,
903 });
904 }
905 })
906 }
907}
908
909#[cfg(test)]
910mod tests {
911 use super::*;
912
913 #[test]
914 fn test_new_creates_provider() {
915 let provider = VertexProvider::new(
916 "token".to_string(),
917 "my-project".to_string(),
918 "us-central1".to_string(),
919 "custom-model".to_string(),
920 );
921
922 assert_eq!(provider.model(), "custom-model");
923 assert_eq!(provider.provider(), "vertex");
924 }
925
926 #[test]
927 fn test_flash_factory() {
928 let provider = VertexProvider::flash(
929 "token".to_string(),
930 "my-project".to_string(),
931 "us-central1".to_string(),
932 );
933
934 assert_eq!(provider.model(), MODEL_GEMINI_3_FLASH);
935 assert_eq!(provider.provider(), "vertex");
936 }
937
938 #[test]
939 fn test_pro_factory() {
940 let provider = VertexProvider::pro(
941 "token".to_string(),
942 "my-project".to_string(),
943 "us-central1".to_string(),
944 );
945
946 assert_eq!(provider.model(), MODEL_GEMINI_31_PRO);
947 assert_eq!(provider.provider(), "vertex");
948 }
949
950 #[test]
951 fn test_provider_is_cloneable() {
952 let provider = VertexProvider::new(
953 "token".to_string(),
954 "my-project".to_string(),
955 "us-central1".to_string(),
956 "test-model".to_string(),
957 );
958 let cloned = provider.clone();
959
960 assert_eq!(provider.model(), cloned.model());
961 assert_eq!(provider.provider(), cloned.provider());
962 }
963
964 #[test]
965 fn test_is_claude_model() {
966 let claude_provider = VertexProvider::new(
967 "token".to_string(),
968 "project".to_string(),
969 "us-central1".to_string(),
970 "claude-sonnet-4-20250514".to_string(),
971 );
972 assert!(claude_provider.is_claude_model());
973
974 let gemini_provider = VertexProvider::new(
975 "token".to_string(),
976 "project".to_string(),
977 "us-central1".to_string(),
978 "gemini-3-flash-preview".to_string(),
979 );
980 assert!(!gemini_provider.is_claude_model());
981 }
982
983 #[test]
984 fn test_base_url_gemini() {
985 let provider = VertexProvider::new(
986 "token".to_string(),
987 "my-project".to_string(),
988 "us-central1".to_string(),
989 "gemini-3-flash-preview".to_string(),
990 );
991
992 let url = provider.base_url("google");
993 assert_eq!(
994 url,
995 "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google/models/gemini-3-flash-preview"
996 );
997 }
998
999 #[test]
1000 fn test_base_url_claude() {
1001 let provider = VertexProvider::new(
1002 "token".to_string(),
1003 "my-project".to_string(),
1004 "us-central1".to_string(),
1005 "claude-sonnet-4-20250514".to_string(),
1006 );
1007
1008 let url = provider.base_url("anthropic");
1009 assert_eq!(
1010 url,
1011 "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/anthropic/models/claude-sonnet-4-20250514"
1012 );
1013 }
1014
1015 #[test]
1016 fn test_base_url_with_different_region() {
1017 let provider = VertexProvider::new(
1018 "token".to_string(),
1019 "other-project".to_string(),
1020 "europe-west4".to_string(),
1021 "gemini-3.1-pro-preview".to_string(),
1022 );
1023
1024 let url = provider.base_url("google");
1025 assert!(url.starts_with("https://europe-west4-aiplatform.googleapis.com/"));
1026 assert!(url.contains("/projects/other-project/"));
1027 assert!(url.contains("/locations/europe-west4/"));
1028 assert!(url.ends_with("/models/gemini-3.1-pro-preview"));
1029 }
1030
1031 #[test]
1032 fn test_base_url_global_region_has_no_prefix() {
1033 let provider = VertexProvider::new(
1034 "token".to_string(),
1035 "my-project".to_string(),
1036 "global".to_string(),
1037 "gemini-3.1-pro-preview".to_string(),
1038 );
1039
1040 let url = provider.base_url("google");
1041 assert_eq!(
1042 url,
1043 "https://aiplatform.googleapis.com/v1/projects/my-project/locations/global/publishers/google/models/gemini-3.1-pro-preview"
1044 );
1045 }
1046
1047 #[test]
1048 fn test_vertex_claude_46_rejects_budgeted_thinking() {
1049 let provider = VertexProvider::new(
1050 "token".to_string(),
1051 "project".to_string(),
1052 "global".to_string(),
1053 MODEL_SONNET_46.to_string(),
1054 );
1055
1056 let error = provider
1057 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1058 .unwrap_err();
1059 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1060 }
1061
1062 #[test]
1063 fn test_vertex_claude_opus_47_rejects_budgeted_thinking() {
1064 let provider = VertexProvider::new(
1065 "token".to_string(),
1066 "project".to_string(),
1067 "global".to_string(),
1068 MODEL_OPUS_47.to_string(),
1069 );
1070
1071 let error = provider
1072 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1073 .unwrap_err();
1074 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1075 }
1076
1077 #[test]
1078 fn test_vertex_claude_opus_48_rejects_budgeted_thinking() {
1079 let provider = VertexProvider::new(
1080 "token".to_string(),
1081 "project".to_string(),
1082 "global".to_string(),
1083 MODEL_OPUS_48.to_string(),
1084 );
1085
1086 let error = provider
1087 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1088 .unwrap_err();
1089 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1090 }
1091
1092 #[test]
1093 fn test_vertex_claude_fable_5_rejects_budgeted_thinking() {
1094 let provider = VertexProvider::new(
1095 "token".to_string(),
1096 "project".to_string(),
1097 "global".to_string(),
1098 MODEL_FABLE_5.to_string(),
1099 );
1100
1101 let error = provider
1102 .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1103 .unwrap_err();
1104 assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1105 }
1106
1107 #[test]
1108 fn test_model_constants() {
1109 assert_eq!(MODEL_GEMINI_3_FLASH, "gemini-3-flash-preview");
1110 assert_eq!(MODEL_GEMINI_31_PRO, "gemini-3.1-pro-preview");
1111 assert_eq!(MODEL_GEMINI_3_PRO, "gemini-3.0-pro");
1112 }
1113
1114 fn request_with_max_tokens(max_tokens: u32, explicit: bool) -> ChatRequest {
1115 ChatRequest {
1116 system: String::new(),
1117 messages: vec![agent_sdk_foundation::llm::Message::user("hi")],
1118 tools: None,
1119 max_tokens,
1120 max_tokens_explicit: explicit,
1121 session_id: None,
1122 cached_content: None,
1123 thinking: None,
1124 tool_choice: None,
1125 response_format: None,
1126 }
1127 }
1128
1129 #[test]
1130 fn test_effective_max_tokens_honors_explicit_budget() {
1131 let provider = VertexProvider::new(
1132 "token".to_string(),
1133 "project".to_string(),
1134 "global".to_string(),
1135 MODEL_SONNET_46.to_string(),
1136 );
1137 let request = request_with_max_tokens(1234, true);
1138 assert_eq!(provider.effective_max_tokens(&request), 1234);
1139 }
1140
1141 #[test]
1142 fn test_effective_max_tokens_uses_clamped_default_when_implicit() {
1143 let provider = VertexProvider::new(
1147 "token".to_string(),
1148 "project".to_string(),
1149 "global".to_string(),
1150 MODEL_SONNET_46.to_string(),
1151 );
1152 let request = request_with_max_tokens(4096, false);
1153 let effective = provider.effective_max_tokens(&request);
1154 assert_eq!(effective, provider.default_max_tokens());
1155 assert!(effective <= DEFAULT_SAFE_MAX_OUTPUT_TOKENS);
1156 }
1157}