Skip to main content

agent_sdk_providers/impls/
vertex.rs

1//! Google Vertex AI provider implementation.
2//!
3//! This module provides an implementation of `LlmProvider` for the Google Vertex AI
4//! platform. It supports both Gemini models (using the Gemini API format) and
5//! Claude models (using the Anthropic Messages API format via `rawPredict`).
6//!
7//! Publisher detection is automatic based on the model name:
8//! - `claude-*` models route to `publishers/anthropic` using `rawPredict`
9//! - All other models route to `publishers/google` using `generateContent`
10
11use crate::attachments::validate_request_attachments;
12use crate::impls::anthropic::{
13    MODEL_FABLE_5, MODEL_OPUS_46, MODEL_OPUS_47, MODEL_OPUS_48, MODEL_SONNET_46,
14    data as anthropic_data,
15};
16use crate::impls::gemini::data::{
17    ApiContent, ApiFunctionCallingConfig, ApiGenerateContentRequest, ApiGenerateContentResponse,
18    ApiGenerationConfig, ApiPart, ApiUsageMetadata, build_api_contents, build_content_blocks,
19    convert_tools_to_config, gemini_response_schema, map_finish_reason, map_thinking_config,
20    stream_gemini_response,
21};
22use crate::provider::LlmProvider;
23use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
24use agent_sdk_foundation::llm::{
25    ChatOutcome, ChatRequest, ChatResponse, ResponseFormat, ThinkingConfig, ThinkingMode, Usage,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::StreamExt;
30use reqwest::StatusCode;
31
32pub const MODEL_GEMINI_3_FLASH: &str = "gemini-3-flash-preview";
33pub const MODEL_GEMINI_31_PRO: &str = "gemini-3.1-pro-preview";
34
35// Legacy Gemini 3.0 Pro model kept for explicit opt-in.
36pub const MODEL_GEMINI_3_PRO: &str = "gemini-3.0-pro";
37
38/// The Anthropic API version used for Claude models on Vertex AI.
39const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
40const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
41
42/// Connect timeout for the HTTP client (matches Anthropic).
43const CONNECT_TIMEOUT_SECS: u64 = 30;
44/// TCP keepalive interval to keep long streaming connections from dropping.
45const TCP_KEEPALIVE_SECS: u64 = 30;
46/// Per-request read timeout for the **non-streaming** chat paths. Bounds a
47/// black-holed endpoint so a single turn cannot hang the agent loop forever.
48/// Streaming requests intentionally have no overall timeout.
49const CHAT_READ_TIMEOUT_SECS: u64 = 300;
50
51const fn vertex_cache_control() -> anthropic_data::ApiCacheControl {
52    anthropic_data::ApiCacheControl::ephemeral_with_ttl(None)
53}
54
55/// Build the Claude-on-Vertex tool list, caching the tool prefix with the
56/// Vertex ephemeral breakpoint.
57fn build_vertex_claude_tools(request: &ChatRequest) -> Option<Vec<anthropic_data::ApiTool>> {
58    anthropic_data::build_api_tools_with_cache(request, Some(vertex_cache_control()))
59}
60
61/// Google Vertex AI LLM provider.
62///
63/// Uses the same Gemini request/response format as `GeminiProvider` but
64/// authenticates via `OAuth2` Bearer tokens and routes through the Vertex AI
65/// regional endpoint.
66///
67/// Claude models are also supported — the provider detects the publisher from
68/// the model name and uses the appropriate API format automatically.
69#[derive(Clone)]
70pub struct VertexProvider {
71    client: reqwest::Client,
72    access_token: String,
73    project_id: String,
74    region: String,
75    model: String,
76    thinking: Option<ThinkingConfig>,
77}
78
79impl VertexProvider {
80    /// Create a new Vertex AI provider with full control over all parameters.
81    #[must_use]
82    pub fn new(access_token: String, project_id: String, region: String, model: String) -> Self {
83        let client = reqwest::Client::builder()
84            .connect_timeout(std::time::Duration::from_secs(CONNECT_TIMEOUT_SECS))
85            .tcp_keepalive(std::time::Duration::from_secs(TCP_KEEPALIVE_SECS))
86            .build()
87            .unwrap_or_else(|error| {
88                log::warn!(
89                    "failed to build Vertex HTTP client with timeouts ({error}); using default client"
90                );
91                reqwest::Client::new()
92            });
93
94        Self {
95            client,
96            access_token,
97            project_id,
98            region,
99            model,
100            thinking: None,
101        }
102    }
103
104    /// Create a provider using Gemini 3 Flash Preview on Vertex AI.
105    #[must_use]
106    pub fn flash(access_token: String, project_id: String, region: String) -> Self {
107        Self::new(
108            access_token,
109            project_id,
110            region,
111            MODEL_GEMINI_3_FLASH.to_owned(),
112        )
113    }
114
115    /// Create a provider using Gemini 3.1 Pro Preview on Vertex AI.
116    #[must_use]
117    pub fn pro(access_token: String, project_id: String, region: String) -> Self {
118        Self::new(
119            access_token,
120            project_id,
121            region,
122            MODEL_GEMINI_31_PRO.to_owned(),
123        )
124    }
125
126    /// Detect whether the model is a Claude model (Anthropic publisher).
127    fn is_claude_model(&self) -> bool {
128        self.model.starts_with("claude-")
129    }
130
131    /// Build the base URL for the given publisher and model.
132    ///
133    /// For the `global` location the domain is `aiplatform.googleapis.com`
134    /// (no region prefix). Regional locations use `{region}-aiplatform.googleapis.com`.
135    fn base_url(&self, publisher: &str) -> String {
136        let domain = if self.region == "global" {
137            "aiplatform.googleapis.com".to_owned()
138        } else {
139            format!("{}-aiplatform.googleapis.com", self.region)
140        };
141        format!(
142            "https://{domain}/v1/projects/{project}/locations/{region}/publishers/{publisher}/models/{model}",
143            domain = domain,
144            region = self.region,
145            project = self.project_id,
146            publisher = publisher,
147            model = self.model,
148        )
149    }
150
151    /// Set the provider-owned thinking configuration for this model.
152    #[must_use]
153    pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
154        self.thinking = Some(thinking);
155        self
156    }
157
158    fn requires_anthropic_adaptive_thinking(&self) -> bool {
159        matches!(
160            self.model.as_str(),
161            MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48 | MODEL_FABLE_5
162        )
163    }
164
165    fn build_cached_vertex_claude_messages(
166        request: &ChatRequest,
167    ) -> Vec<anthropic_data::ApiMessage> {
168        let mut messages = anthropic_data::build_api_messages(request);
169        anthropic_data::apply_cache_control_to_last_user_message(
170            &mut messages,
171            vertex_cache_control(),
172        );
173        messages
174    }
175
176    fn build_vertex_claude_system_prompt(
177        system: &str,
178    ) -> Option<anthropic_data::ApiSystemPrompt<'_>> {
179        anthropic_data::build_api_system_prompt(system, Some(vertex_cache_control()))
180    }
181
182    /// Effective output-token budget for a request.
183    ///
184    /// Mirrors the Anthropic provider: an implicit budget falls back to the
185    /// provider/model default ([`default_max_tokens`](LlmProvider::default_max_tokens),
186    /// which clamps to Vertex's safe ceiling) instead of silently capping at
187    /// `ChatRequest::DEFAULT_MAX_TOKENS`.
188    fn effective_max_tokens(&self, request: &ChatRequest) -> u32 {
189        if request.max_tokens_explicit {
190            request.max_tokens
191        } else {
192            self.default_max_tokens()
193        }
194    }
195
196    fn map_claude_response(api_response: anthropic_data::ApiResponse) -> ChatResponse {
197        let content = anthropic_data::map_content_blocks(api_response.content);
198        let stop_reason = api_response
199            .stop_reason
200            .as_ref()
201            .map(anthropic_data::map_stop_reason);
202
203        ChatResponse {
204            id: api_response.id,
205            content,
206            model: api_response.model,
207            stop_reason,
208            usage: Usage {
209                input_tokens: api_response.usage.total_input_tokens(),
210                output_tokens: api_response.usage.output,
211                cached_input_tokens: api_response.usage.cached_input_tokens(),
212                cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
213            },
214        }
215    }
216}
217
218#[async_trait]
219impl LlmProvider for VertexProvider {
220    async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
221        if self.is_claude_model() {
222            return self.chat_claude(request).await;
223        }
224        self.chat_gemini(request).await
225    }
226
227    fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
228        if self.is_claude_model() {
229            return self.chat_stream_claude(request);
230        }
231        self.chat_stream_gemini(request)
232    }
233
234    fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
235        let Some(thinking) = thinking else {
236            return Ok(());
237        };
238
239        if self
240            .capabilities()
241            .is_some_and(|caps| !caps.supports_thinking)
242        {
243            return Err(anyhow::anyhow!(
244                "thinking is not supported for provider={} model={}",
245                self.provider(),
246                self.model()
247            ));
248        }
249
250        if matches!(thinking.mode, ThinkingMode::Adaptive)
251            && !self
252                .capabilities()
253                .is_some_and(|caps| caps.supports_adaptive_thinking)
254        {
255            return Err(anyhow::anyhow!(
256                "adaptive thinking is not supported for provider={} model={}",
257                self.provider(),
258                self.model()
259            ));
260        }
261
262        if self.is_claude_model()
263            && self.requires_anthropic_adaptive_thinking()
264            && matches!(thinking.mode, ThinkingMode::Enabled { .. })
265        {
266            return Err(anyhow::anyhow!(
267                "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
268                self.provider(),
269                self.model()
270            ));
271        }
272
273        Ok(())
274    }
275
276    fn model(&self) -> &str {
277        &self.model
278    }
279
280    fn provider(&self) -> &'static str {
281        "vertex"
282    }
283
284    fn configured_thinking(&self) -> Option<&ThinkingConfig> {
285        self.thinking.as_ref()
286    }
287
288    fn default_max_tokens(&self) -> u32 {
289        let provider = if self.is_claude_model() {
290            "anthropic"
291        } else {
292            "gemini"
293        };
294        let model_max = self
295            .capabilities()
296            .and_then(|caps| caps.max_output_tokens)
297            .or_else(|| {
298                crate::model_capabilities::default_max_output_tokens(provider, self.model())
299            })
300            .unwrap_or(4096);
301        model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
302    }
303}
304
305// ============================================================================
306// Gemini path (publishers/google)
307// ============================================================================
308
309impl VertexProvider {
310    #[allow(clippy::too_many_lines)]
311    async fn chat_gemini(&self, request: ChatRequest) -> Result<ChatOutcome> {
312        let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
313            Ok(thinking) => thinking,
314            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
315        };
316        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
317            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
318        }
319        let contents = build_api_contents(&request.messages);
320        let tools = request
321            .tools
322            .as_ref()
323            .map(|t| convert_tools_to_config(t.clone()));
324        let tool_config = request
325            .tool_choice
326            .as_ref()
327            .map(ApiFunctionCallingConfig::from_tool_choice);
328        let system_instruction = if request.system.is_empty() {
329            None
330        } else {
331            Some(ApiContent {
332                role: None,
333                parts: vec![ApiPart::Text {
334                    text: request.system.clone(),
335                    thought_signature: None,
336                }],
337            })
338        };
339
340        let thinking_config = thinking.as_ref().map(map_thinking_config);
341        let (response_mime_type, response_schema) =
342            request.response_format.as_ref().map_or((None, None), |rf| {
343                (
344                    Some("application/json"),
345                    Some(gemini_response_schema(&rf.schema)),
346                )
347            });
348
349        let max_tokens = self.effective_max_tokens(&request);
350        let api_request = ApiGenerateContentRequest {
351            contents: &contents,
352            system_instruction: system_instruction.as_ref(),
353            tools: tools.as_ref().map(std::slice::from_ref),
354            tool_config,
355            generation_config: Some(ApiGenerationConfig {
356                max_output_tokens: Some(max_tokens),
357                thinking_config,
358                response_mime_type,
359                response_schema,
360            }),
361            cached_content: request.cached_content.as_deref(),
362        };
363
364        log::debug!(
365            "Vertex AI LLM request model={} max_tokens={}",
366            self.model,
367            max_tokens
368        );
369
370        let url = format!("{}:generateContent", self.base_url("google"));
371
372        let response = self
373            .client
374            .post(&url)
375            .header("Content-Type", "application/json")
376            .timeout(std::time::Duration::from_secs(CHAT_READ_TIMEOUT_SECS))
377            .bearer_auth(&self.access_token)
378            .json(&api_request)
379            .send()
380            .await
381            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
382
383        let status = response.status();
384        // Read `Retry-After` off the 429 response before the body is consumed.
385        let retry_after = if status == StatusCode::TOO_MANY_REQUESTS {
386            crate::http::retry_after_from_headers(response.headers())
387        } else {
388            None
389        };
390        let bytes = response
391            .bytes()
392            .await
393            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
394
395        log::debug!(
396            "Vertex AI LLM response status={} body_len={}",
397            status,
398            bytes.len()
399        );
400
401        if status == StatusCode::TOO_MANY_REQUESTS {
402            return Ok(ChatOutcome::RateLimited(retry_after));
403        }
404
405        if status.is_server_error() {
406            let body = String::from_utf8_lossy(&bytes);
407            log::error!("Vertex AI server error status={status} body={body}");
408            return Ok(ChatOutcome::ServerError(body.into_owned()));
409        }
410
411        if status.is_client_error() {
412            let body = String::from_utf8_lossy(&bytes);
413            log::warn!("Vertex AI client error status={status} body={body}");
414            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
415        }
416
417        let api_response: ApiGenerateContentResponse = serde_json::from_slice(&bytes)
418            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
419
420        let candidate = api_response
421            .candidates
422            .into_iter()
423            .next()
424            .ok_or_else(|| anyhow::anyhow!("no candidates in response"))?;
425
426        let content = build_content_blocks(&candidate.content);
427
428        if content.is_empty() && !candidate.content.parts.is_empty() {
429            log::warn!(
430                "Vertex AI parts not converted to content blocks raw_parts={:?}",
431                candidate.content.parts
432            );
433        }
434
435        let has_tool_calls = content
436            .iter()
437            .any(|b| matches!(b, agent_sdk_foundation::llm::ContentBlock::ToolUse { .. }));
438
439        let stop_reason = candidate
440            .finish_reason
441            .as_ref()
442            .map(|r| map_finish_reason(r, has_tool_calls));
443
444        let usage = api_response
445            .usage_metadata
446            .unwrap_or(ApiUsageMetadata {
447                prompt: 0,
448                candidates: 0,
449                cached_content: 0,
450            })
451            .into_usage();
452
453        Ok(ChatOutcome::Success(ChatResponse {
454            id: String::new(),
455            content,
456            model: self.model.clone(),
457            stop_reason,
458            usage,
459        }))
460    }
461
462    fn chat_stream_gemini(&self, request: ChatRequest) -> StreamBox<'_> {
463        Box::pin(async_stream::stream! {
464            let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
465                Ok(thinking) => thinking,
466                Err(error) => {
467                    yield Ok(StreamDelta::Error {
468                        message: error.to_string(),
469                        kind: StreamErrorKind::InvalidRequest,
470                    });
471                    return;
472                }
473            };
474            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
475                yield Ok(StreamDelta::Error {
476                    message: error.to_string(),
477                    kind: StreamErrorKind::InvalidRequest,
478                });
479                return;
480            }
481
482            let contents = build_api_contents(&request.messages);
483            let tools = request
484                .tools
485                .as_ref()
486                .map(|t| convert_tools_to_config(t.clone()));
487            let tool_config = request
488                .tool_choice
489                .as_ref()
490                .map(ApiFunctionCallingConfig::from_tool_choice);
491            let system_instruction = build_gemini_system_instruction(&request.system);
492            let thinking_config = thinking.as_ref().map(map_thinking_config);
493            let (response_mime_type, response_schema) =
494                gemini_response_format(request.response_format.as_ref());
495
496            let max_tokens = self.effective_max_tokens(&request);
497            let api_request = ApiGenerateContentRequest {
498                contents: &contents,
499                system_instruction: system_instruction.as_ref(),
500                tools: tools.as_ref().map(std::slice::from_ref),
501                tool_config,
502                generation_config: Some(ApiGenerationConfig {
503                    max_output_tokens: Some(max_tokens),
504                    thinking_config,
505                    response_mime_type,
506                    response_schema,
507                }),
508                cached_content: request.cached_content.as_deref(),
509            };
510
511            log::debug!(
512                "Vertex AI streaming LLM request model={} max_tokens={}",
513                self.model,
514                max_tokens
515            );
516
517            let url = format!("{}:streamGenerateContent?alt=sse", self.base_url("google"));
518
519            let response = match self.send_gemini_stream_request(&url, &api_request).await {
520                Ok(response) => response,
521                Err(item) => {
522                    yield item;
523                    return;
524                }
525            };
526
527            let mut inner = stream_gemini_response(response);
528            while let Some(item) = futures::StreamExt::next(&mut inner).await {
529                yield item;
530            }
531        })
532    }
533
534    /// Issue the Vertex Gemini streaming request, returning the raw response on
535    /// success or a ready-to-`yield` stream item describing the failure.
536    ///
537    /// The `Err` payload is the exact `StreamBox` item to `yield`: an
538    /// `anyhow::Error` for a transport failure, or a classified
539    /// [`StreamDelta::Error`] for a non-success HTTP status. This keeps the
540    /// generator's failure handling to a single `yield`.
541    async fn send_gemini_stream_request(
542        &self,
543        url: &str,
544        api_request: &ApiGenerateContentRequest<'_>,
545    ) -> Result<reqwest::Response, anyhow::Result<StreamDelta>> {
546        let response = match self
547            .client
548            .post(url)
549            .header("Content-Type", "application/json")
550            .bearer_auth(&self.access_token)
551            .json(api_request)
552            .send()
553            .await
554        {
555            Ok(response) => response,
556            // Include the cause so 401 detection / diagnostics survive.
557            Err(e) => return Err(Err(anyhow::anyhow!("request failed: {e}"))),
558        };
559
560        let status = response.status();
561        if !status.is_success() {
562            let body = response.text().await.unwrap_or_default();
563            let kind = if status == StatusCode::TOO_MANY_REQUESTS {
564                StreamErrorKind::RateLimited
565            } else if status.is_server_error() {
566                StreamErrorKind::ServerError
567            } else {
568                StreamErrorKind::InvalidRequest
569            };
570            log::warn!("Vertex AI error status={status} body={body}");
571            return Err(Ok(StreamDelta::Error {
572                message: body,
573                kind,
574            }));
575        }
576
577        Ok(response)
578    }
579}
580
581/// Build the Gemini `system_instruction` content from the request system prompt,
582/// or `None` when the prompt is empty.
583fn build_gemini_system_instruction(system: &str) -> Option<ApiContent> {
584    if system.is_empty() {
585        None
586    } else {
587        Some(ApiContent {
588            role: None,
589            parts: vec![ApiPart::Text {
590                text: system.to_owned(),
591                thought_signature: None,
592            }],
593        })
594    }
595}
596
597/// Map an optional response format into Gemini's `(responseMimeType,
598/// responseSchema)` pair, sanitizing the schema to the subset Gemini accepts.
599fn gemini_response_format(
600    response_format: Option<&ResponseFormat>,
601) -> (Option<&'static str>, Option<serde_json::Value>) {
602    response_format.map_or((None, None), |rf| {
603        (
604            Some("application/json"),
605            Some(gemini_response_schema(&rf.schema)),
606        )
607    })
608}
609
610// ============================================================================
611// Claude path (publishers/anthropic)
612// ============================================================================
613
614impl VertexProvider {
615    async fn chat_claude(&self, request: ChatRequest) -> Result<ChatOutcome> {
616        let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
617            Ok(thinking) => thinking,
618            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
619        };
620        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
621            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
622        }
623        let messages = Self::build_cached_vertex_claude_messages(&request);
624        let tools = build_vertex_claude_tools(&request);
625        let thinking = thinking_config
626            .as_ref()
627            .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
628        let output_config = thinking_config
629            .as_ref()
630            .and_then(|t| t.effort)
631            .map(|effort| anthropic_data::ApiOutputConfig { effort });
632        let system = Self::build_vertex_claude_system_prompt(&request.system);
633        let tool_choice = request
634            .tool_choice
635            .as_ref()
636            .map(anthropic_data::ApiToolChoice::from_tool_choice);
637
638        let max_tokens = self.effective_max_tokens(&request);
639        let api_request = anthropic_data::ApiMessagesRequest {
640            model: None, // model is in the URL for Vertex
641            max_tokens,
642            system,
643            messages: &messages,
644            tools: tools.as_deref(),
645            tool_choice,
646            stream: false,
647            thinking,
648            output_config,
649            anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
650        };
651
652        log::debug!(
653            "Vertex AI (Claude) LLM request model={} max_tokens={}",
654            self.model,
655            max_tokens
656        );
657
658        if log::log_enabled!(log::Level::Debug) {
659            match serde_json::to_string_pretty(&api_request) {
660                Ok(json) => log::debug!("Vertex AI (Claude) request payload:\n{json}"),
661                Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
662            }
663        }
664
665        let url = format!("{}:rawPredict", self.base_url("anthropic"));
666
667        let response = self
668            .client
669            .post(&url)
670            .header("Content-Type", "application/json")
671            .timeout(std::time::Duration::from_secs(CHAT_READ_TIMEOUT_SECS))
672            .bearer_auth(&self.access_token)
673            .json(&api_request)
674            .send()
675            .await
676            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
677
678        let status = response.status();
679        // Read `Retry-After` off the 429 response before the body is consumed.
680        let retry_after = if status == StatusCode::TOO_MANY_REQUESTS {
681            crate::http::retry_after_from_headers(response.headers())
682        } else {
683            None
684        };
685        let bytes = response
686            .bytes()
687            .await
688            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
689
690        log::debug!(
691            "Vertex AI (Claude) response status={} body_len={}",
692            status,
693            bytes.len()
694        );
695
696        if status == StatusCode::TOO_MANY_REQUESTS {
697            return Ok(ChatOutcome::RateLimited(retry_after));
698        }
699
700        if status.is_server_error() {
701            let body = String::from_utf8_lossy(&bytes);
702            log::error!("Vertex AI (Claude) server error status={status} body={body}");
703            return Ok(ChatOutcome::ServerError(body.into_owned()));
704        }
705
706        if status.is_client_error() {
707            let body = String::from_utf8_lossy(&bytes);
708            log::warn!("Vertex AI (Claude) client error status={status} body={body}");
709            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
710        }
711
712        let api_response: anthropic_data::ApiResponse = serde_json::from_slice(&bytes)
713            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
714
715        log::debug!(
716            "Vertex AI (Claude) response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
717            api_response.id,
718            api_response.model,
719            api_response.stop_reason,
720            api_response.usage.total_input_tokens(),
721            api_response.usage.output,
722            api_response.content.len()
723        );
724
725        Ok(ChatOutcome::Success(Self::map_claude_response(
726            api_response,
727        )))
728    }
729
730    #[allow(clippy::too_many_lines)]
731    fn chat_stream_claude(&self, request: ChatRequest) -> StreamBox<'_> {
732        Box::pin(async_stream::stream! {
733            let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
734                Ok(thinking) => thinking,
735                Err(error) => {
736                    yield Ok(StreamDelta::Error {
737                        message: error.to_string(),
738                        kind: StreamErrorKind::InvalidRequest,
739                    });
740                    return;
741                }
742            };
743            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
744                yield Ok(StreamDelta::Error {
745                    message: error.to_string(),
746                    kind: StreamErrorKind::InvalidRequest,
747                });
748                return;
749            }
750            let messages = Self::build_cached_vertex_claude_messages(&request);
751            let tools = build_vertex_claude_tools(&request);
752            let thinking = thinking_config
753                .as_ref()
754                .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
755            let output_config = thinking_config
756                .as_ref()
757                .and_then(|t| t.effort)
758                .map(|effort| anthropic_data::ApiOutputConfig { effort });
759            let system = Self::build_vertex_claude_system_prompt(&request.system);
760            let tool_choice = request
761                .tool_choice
762                .as_ref()
763                .map(anthropic_data::ApiToolChoice::from_tool_choice);
764
765            let max_tokens = self.effective_max_tokens(&request);
766            let api_request = anthropic_data::ApiMessagesRequest {
767                model: None, // model is in the URL for Vertex
768                max_tokens,
769                system,
770                messages: &messages,
771                tools: tools.as_deref(),
772                tool_choice,
773                stream: true,
774                thinking,
775                output_config,
776                anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
777            };
778
779            log::debug!(
780                "Vertex AI (Claude) streaming request model={} max_tokens={}",
781                self.model,
782                max_tokens
783            );
784
785            if log::log_enabled!(log::Level::Debug) {
786                match serde_json::to_string_pretty(&api_request) {
787                    Ok(json) => log::debug!("Vertex AI (Claude) streaming request payload:\n{json}"),
788                    Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
789                }
790            }
791
792            let url = format!("{}:streamRawPredict", self.base_url("anthropic"));
793
794            let response = match self
795                .client
796                .post(&url)
797                .header("Content-Type", "application/json")
798                .bearer_auth(&self.access_token)
799                .json(&api_request)
800                .send()
801                .await
802            {
803                Ok(r) => r,
804                Err(e) => {
805                    yield Err(anyhow::anyhow!("request failed: {e}"));
806                    return;
807                }
808            };
809
810            let status = response.status();
811
812            if status == StatusCode::TOO_MANY_REQUESTS {
813                yield Ok(StreamDelta::Error {
814                    message: "Rate limited".to_string(),
815                    kind: StreamErrorKind::RateLimited,
816                });
817                return;
818            }
819
820            if status.is_server_error() {
821                let body = response.text().await.unwrap_or_default();
822                log::error!("Vertex AI (Claude) server error status={status} body={body}");
823                yield Ok(StreamDelta::Error {
824                    message: body,
825                    kind: StreamErrorKind::ServerError,
826                });
827                return;
828            }
829
830            if status.is_client_error() {
831                let body = response.text().await.unwrap_or_default();
832                log::warn!("Vertex AI (Claude) client error status={status} body={body}");
833                yield Ok(StreamDelta::Error {
834                    message: body,
835                    kind: StreamErrorKind::InvalidRequest,
836                });
837                return;
838            }
839
840            // Process SSE stream using the Anthropic SSE parser
841            let mut stream = response.bytes_stream();
842            let mut buffer = String::new();
843            let mut input_tokens: u32 = 0;
844            let mut output_tokens: u32 = 0;
845            let mut cached_input_tokens: u32 = 0;
846            let mut cache_creation_input_tokens: u32 = 0;
847            let mut tool_ids: std::collections::HashMap<usize, String> =
848                std::collections::HashMap::new();
849            let mut received_message_stop = false;
850            let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
851
852            while let Some(chunk_result) = stream.next().await {
853                let chunk = match chunk_result {
854                    Ok(c) => c,
855                    Err(e) => {
856                        // Include the cause so 401 detection / diagnostics survive.
857                        yield Err(anyhow::anyhow!("stream error: {e}"));
858                        return;
859                    }
860                };
861
862                buffer.push_str(&String::from_utf8_lossy(&chunk));
863
864                // Process complete SSE events (terminated by a blank line)
865                while let Some(event_block) = anthropic_data::take_next_sse_event(&mut buffer) {
866                    if anthropic_data::is_message_stop_event(&event_block) {
867                        received_message_stop = true;
868                    }
869
870                    if let Some(delta) = anthropic_data::parse_sse_event(
871                        &event_block,
872                        &mut input_tokens,
873                        &mut output_tokens,
874                        &mut cached_input_tokens,
875                        &mut cache_creation_input_tokens,
876                        &mut tool_ids,
877                        &mut pending_stop_reason,
878                    ) {
879                        yield Ok(delta);
880                    }
881                    if anthropic_data::is_message_stop_event(&event_block) {
882                        yield Ok(StreamDelta::Done {
883                            stop_reason: pending_stop_reason.take(),
884                        });
885                    }
886                }
887            }
888
889            // Process remaining buffer
890            let remaining = buffer.trim();
891            if !remaining.is_empty() {
892                if anthropic_data::is_message_stop_event(remaining) {
893                    received_message_stop = true;
894                }
895
896                if let Some(delta) = anthropic_data::parse_sse_event(
897                    remaining,
898                    &mut input_tokens,
899                    &mut output_tokens,
900                    &mut cached_input_tokens,
901                    &mut cache_creation_input_tokens,
902                    &mut tool_ids,
903                    &mut pending_stop_reason,
904                ) {
905                    yield Ok(delta);
906                }
907                if anthropic_data::is_message_stop_event(remaining) {
908                    yield Ok(StreamDelta::Done {
909                        stop_reason: pending_stop_reason.take(),
910                    });
911                }
912            }
913
914            if !received_message_stop {
915                log::warn!(
916                    "Vertex AI (Claude) SSE stream ended without message_stop"
917                );
918                yield Ok(StreamDelta::Error {
919                    message: "Stream ended unexpectedly without completion".to_string(),
920                    kind: StreamErrorKind::ServerError,
921                });
922            }
923        })
924    }
925}
926
927#[cfg(test)]
928mod tests {
929    use super::*;
930
931    #[test]
932    fn test_new_creates_provider() {
933        let provider = VertexProvider::new(
934            "token".to_string(),
935            "my-project".to_string(),
936            "us-central1".to_string(),
937            "custom-model".to_string(),
938        );
939
940        assert_eq!(provider.model(), "custom-model");
941        assert_eq!(provider.provider(), "vertex");
942    }
943
944    #[test]
945    fn test_flash_factory() {
946        let provider = VertexProvider::flash(
947            "token".to_string(),
948            "my-project".to_string(),
949            "us-central1".to_string(),
950        );
951
952        assert_eq!(provider.model(), MODEL_GEMINI_3_FLASH);
953        assert_eq!(provider.provider(), "vertex");
954    }
955
956    #[test]
957    fn test_pro_factory() {
958        let provider = VertexProvider::pro(
959            "token".to_string(),
960            "my-project".to_string(),
961            "us-central1".to_string(),
962        );
963
964        assert_eq!(provider.model(), MODEL_GEMINI_31_PRO);
965        assert_eq!(provider.provider(), "vertex");
966    }
967
968    #[test]
969    fn test_provider_is_cloneable() {
970        let provider = VertexProvider::new(
971            "token".to_string(),
972            "my-project".to_string(),
973            "us-central1".to_string(),
974            "test-model".to_string(),
975        );
976        let cloned = provider.clone();
977
978        assert_eq!(provider.model(), cloned.model());
979        assert_eq!(provider.provider(), cloned.provider());
980    }
981
982    #[test]
983    fn test_is_claude_model() {
984        let claude_provider = VertexProvider::new(
985            "token".to_string(),
986            "project".to_string(),
987            "us-central1".to_string(),
988            "claude-sonnet-4-20250514".to_string(),
989        );
990        assert!(claude_provider.is_claude_model());
991
992        let gemini_provider = VertexProvider::new(
993            "token".to_string(),
994            "project".to_string(),
995            "us-central1".to_string(),
996            "gemini-3-flash-preview".to_string(),
997        );
998        assert!(!gemini_provider.is_claude_model());
999    }
1000
1001    #[test]
1002    fn test_base_url_gemini() {
1003        let provider = VertexProvider::new(
1004            "token".to_string(),
1005            "my-project".to_string(),
1006            "us-central1".to_string(),
1007            "gemini-3-flash-preview".to_string(),
1008        );
1009
1010        let url = provider.base_url("google");
1011        assert_eq!(
1012            url,
1013            "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google/models/gemini-3-flash-preview"
1014        );
1015    }
1016
1017    #[test]
1018    fn test_base_url_claude() {
1019        let provider = VertexProvider::new(
1020            "token".to_string(),
1021            "my-project".to_string(),
1022            "us-central1".to_string(),
1023            "claude-sonnet-4-20250514".to_string(),
1024        );
1025
1026        let url = provider.base_url("anthropic");
1027        assert_eq!(
1028            url,
1029            "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/anthropic/models/claude-sonnet-4-20250514"
1030        );
1031    }
1032
1033    #[test]
1034    fn test_base_url_with_different_region() {
1035        let provider = VertexProvider::new(
1036            "token".to_string(),
1037            "other-project".to_string(),
1038            "europe-west4".to_string(),
1039            "gemini-3.1-pro-preview".to_string(),
1040        );
1041
1042        let url = provider.base_url("google");
1043        assert!(url.starts_with("https://europe-west4-aiplatform.googleapis.com/"));
1044        assert!(url.contains("/projects/other-project/"));
1045        assert!(url.contains("/locations/europe-west4/"));
1046        assert!(url.ends_with("/models/gemini-3.1-pro-preview"));
1047    }
1048
1049    #[test]
1050    fn test_base_url_global_region_has_no_prefix() {
1051        let provider = VertexProvider::new(
1052            "token".to_string(),
1053            "my-project".to_string(),
1054            "global".to_string(),
1055            "gemini-3.1-pro-preview".to_string(),
1056        );
1057
1058        let url = provider.base_url("google");
1059        assert_eq!(
1060            url,
1061            "https://aiplatform.googleapis.com/v1/projects/my-project/locations/global/publishers/google/models/gemini-3.1-pro-preview"
1062        );
1063    }
1064
1065    #[test]
1066    fn test_vertex_claude_46_rejects_budgeted_thinking() {
1067        let provider = VertexProvider::new(
1068            "token".to_string(),
1069            "project".to_string(),
1070            "global".to_string(),
1071            MODEL_SONNET_46.to_string(),
1072        );
1073
1074        let error = provider
1075            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1076            .unwrap_err();
1077        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1078    }
1079
1080    #[test]
1081    fn test_vertex_claude_opus_47_rejects_budgeted_thinking() {
1082        let provider = VertexProvider::new(
1083            "token".to_string(),
1084            "project".to_string(),
1085            "global".to_string(),
1086            MODEL_OPUS_47.to_string(),
1087        );
1088
1089        let error = provider
1090            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1091            .unwrap_err();
1092        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1093    }
1094
1095    #[test]
1096    fn test_vertex_claude_opus_48_rejects_budgeted_thinking() {
1097        let provider = VertexProvider::new(
1098            "token".to_string(),
1099            "project".to_string(),
1100            "global".to_string(),
1101            MODEL_OPUS_48.to_string(),
1102        );
1103
1104        let error = provider
1105            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1106            .unwrap_err();
1107        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1108    }
1109
1110    #[test]
1111    fn test_vertex_claude_fable_5_rejects_budgeted_thinking() {
1112        let provider = VertexProvider::new(
1113            "token".to_string(),
1114            "project".to_string(),
1115            "global".to_string(),
1116            MODEL_FABLE_5.to_string(),
1117        );
1118
1119        let error = provider
1120            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1121            .unwrap_err();
1122        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1123    }
1124
1125    #[test]
1126    fn test_model_constants() {
1127        assert_eq!(MODEL_GEMINI_3_FLASH, "gemini-3-flash-preview");
1128        assert_eq!(MODEL_GEMINI_31_PRO, "gemini-3.1-pro-preview");
1129        assert_eq!(MODEL_GEMINI_3_PRO, "gemini-3.0-pro");
1130    }
1131
1132    fn request_with_max_tokens(max_tokens: u32, explicit: bool) -> ChatRequest {
1133        ChatRequest {
1134            system: String::new(),
1135            messages: vec![agent_sdk_foundation::llm::Message::user("hi")],
1136            tools: None,
1137            max_tokens,
1138            max_tokens_explicit: explicit,
1139            session_id: None,
1140            cached_content: None,
1141            thinking: None,
1142            tool_choice: None,
1143            response_format: None,
1144            cache: None,
1145        }
1146    }
1147
1148    #[test]
1149    fn test_effective_max_tokens_honors_explicit_budget() {
1150        let provider = VertexProvider::new(
1151            "token".to_string(),
1152            "project".to_string(),
1153            "global".to_string(),
1154            MODEL_SONNET_46.to_string(),
1155        );
1156        let request = request_with_max_tokens(1234, true);
1157        assert_eq!(provider.effective_max_tokens(&request), 1234);
1158    }
1159
1160    #[test]
1161    fn test_effective_max_tokens_uses_clamped_default_when_implicit() {
1162        // An implicit budget for a Claude-on-Vertex model must fall back to the
1163        // clamped default (<= 32k), not the unclamped capability ceiling and
1164        // not ChatRequest::DEFAULT_MAX_TOKENS.
1165        let provider = VertexProvider::new(
1166            "token".to_string(),
1167            "project".to_string(),
1168            "global".to_string(),
1169            MODEL_SONNET_46.to_string(),
1170        );
1171        let request = request_with_max_tokens(4096, false);
1172        let effective = provider.effective_max_tokens(&request);
1173        assert_eq!(effective, provider.default_max_tokens());
1174        assert!(effective <= DEFAULT_SAFE_MAX_OUTPUT_TOKENS);
1175    }
1176}