Skip to main content

agent_sdk_providers/impls/
vertex.rs

1//! Google Vertex AI provider implementation.
2//!
3//! This module provides an implementation of `LlmProvider` for the Google Vertex AI
4//! platform. It supports both Gemini models (using the Gemini API format) and
5//! Claude models (using the Anthropic Messages API format via `rawPredict`).
6//!
7//! Publisher detection is automatic based on the model name:
8//! - `claude-*` models route to `publishers/anthropic` using `rawPredict`
9//! - All other models route to `publishers/google` using `generateContent`
10
11use crate::attachments::validate_request_attachments;
12use crate::impls::anthropic::{
13    MODEL_FABLE_5, MODEL_OPUS_46, MODEL_OPUS_47, MODEL_OPUS_48, MODEL_SONNET_46,
14    data as anthropic_data,
15};
16use crate::impls::gemini::data::{
17    ApiContent, ApiFunctionCallingConfig, ApiGenerateContentRequest, ApiGenerateContentResponse,
18    ApiGenerationConfig, ApiPart, ApiUsageMetadata, build_api_contents, build_content_blocks,
19    convert_tools_to_config, gemini_response_schema, map_finish_reason, map_thinking_config,
20    stream_gemini_response,
21};
22use crate::provider::LlmProvider;
23use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
24use agent_sdk_foundation::llm::{
25    ChatOutcome, ChatRequest, ChatResponse, ResponseFormat, ThinkingConfig, ThinkingMode, Usage,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::StreamExt;
30use reqwest::StatusCode;
31
32pub const MODEL_GEMINI_3_FLASH: &str = "gemini-3-flash-preview";
33pub const MODEL_GEMINI_31_PRO: &str = "gemini-3.1-pro-preview";
34
35// Legacy Gemini 3.0 Pro model kept for explicit opt-in.
36pub const MODEL_GEMINI_3_PRO: &str = "gemini-3.0-pro";
37
38/// The Anthropic API version used for Claude models on Vertex AI.
39const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
40const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
41
42/// Connect timeout for the HTTP client (matches Anthropic).
43const CONNECT_TIMEOUT_SECS: u64 = 30;
44/// TCP keepalive interval to keep long streaming connections from dropping.
45const TCP_KEEPALIVE_SECS: u64 = 30;
46/// Per-request read timeout for the **non-streaming** chat paths. Bounds a
47/// black-holed endpoint so a single turn cannot hang the agent loop forever.
48/// Streaming requests intentionally have no overall timeout.
49const CHAT_READ_TIMEOUT_SECS: u64 = 300;
50
51const fn vertex_cache_control() -> anthropic_data::ApiCacheControl {
52    anthropic_data::ApiCacheControl::ephemeral()
53}
54
55/// Google Vertex AI LLM provider.
56///
57/// Uses the same Gemini request/response format as `GeminiProvider` but
58/// authenticates via `OAuth2` Bearer tokens and routes through the Vertex AI
59/// regional endpoint.
60///
61/// Claude models are also supported — the provider detects the publisher from
62/// the model name and uses the appropriate API format automatically.
63#[derive(Clone)]
64pub struct VertexProvider {
65    client: reqwest::Client,
66    access_token: String,
67    project_id: String,
68    region: String,
69    model: String,
70    thinking: Option<ThinkingConfig>,
71}
72
73impl VertexProvider {
74    /// Create a new Vertex AI provider with full control over all parameters.
75    #[must_use]
76    pub fn new(access_token: String, project_id: String, region: String, model: String) -> Self {
77        let client = reqwest::Client::builder()
78            .connect_timeout(std::time::Duration::from_secs(CONNECT_TIMEOUT_SECS))
79            .tcp_keepalive(std::time::Duration::from_secs(TCP_KEEPALIVE_SECS))
80            .build()
81            .unwrap_or_else(|error| {
82                log::warn!(
83                    "failed to build Vertex HTTP client with timeouts ({error}); using default client"
84                );
85                reqwest::Client::new()
86            });
87
88        Self {
89            client,
90            access_token,
91            project_id,
92            region,
93            model,
94            thinking: None,
95        }
96    }
97
98    /// Create a provider using Gemini 3 Flash Preview on Vertex AI.
99    #[must_use]
100    pub fn flash(access_token: String, project_id: String, region: String) -> Self {
101        Self::new(
102            access_token,
103            project_id,
104            region,
105            MODEL_GEMINI_3_FLASH.to_owned(),
106        )
107    }
108
109    /// Create a provider using Gemini 3.1 Pro Preview on Vertex AI.
110    #[must_use]
111    pub fn pro(access_token: String, project_id: String, region: String) -> Self {
112        Self::new(
113            access_token,
114            project_id,
115            region,
116            MODEL_GEMINI_31_PRO.to_owned(),
117        )
118    }
119
120    /// Detect whether the model is a Claude model (Anthropic publisher).
121    fn is_claude_model(&self) -> bool {
122        self.model.starts_with("claude-")
123    }
124
125    /// Build the base URL for the given publisher and model.
126    ///
127    /// For the `global` location the domain is `aiplatform.googleapis.com`
128    /// (no region prefix). Regional locations use `{region}-aiplatform.googleapis.com`.
129    fn base_url(&self, publisher: &str) -> String {
130        let domain = if self.region == "global" {
131            "aiplatform.googleapis.com".to_owned()
132        } else {
133            format!("{}-aiplatform.googleapis.com", self.region)
134        };
135        format!(
136            "https://{domain}/v1/projects/{project}/locations/{region}/publishers/{publisher}/models/{model}",
137            domain = domain,
138            region = self.region,
139            project = self.project_id,
140            publisher = publisher,
141            model = self.model,
142        )
143    }
144
145    /// Set the provider-owned thinking configuration for this model.
146    #[must_use]
147    pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
148        self.thinking = Some(thinking);
149        self
150    }
151
152    fn requires_anthropic_adaptive_thinking(&self) -> bool {
153        matches!(
154            self.model.as_str(),
155            MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48 | MODEL_FABLE_5
156        )
157    }
158
159    fn build_cached_vertex_claude_messages(
160        request: &ChatRequest,
161    ) -> Vec<anthropic_data::ApiMessage> {
162        let mut messages = anthropic_data::build_api_messages(request);
163        anthropic_data::apply_cache_control_to_last_user_message(
164            &mut messages,
165            vertex_cache_control(),
166        );
167        messages
168    }
169
170    fn build_vertex_claude_system_prompt(
171        system: &str,
172    ) -> Option<anthropic_data::ApiSystemPrompt<'_>> {
173        anthropic_data::build_api_system_prompt(system, Some(vertex_cache_control()))
174    }
175
176    /// Effective output-token budget for a request.
177    ///
178    /// Mirrors the Anthropic provider: an implicit budget falls back to the
179    /// provider/model default ([`default_max_tokens`](LlmProvider::default_max_tokens),
180    /// which clamps to Vertex's safe ceiling) instead of silently capping at
181    /// `ChatRequest::DEFAULT_MAX_TOKENS`.
182    fn effective_max_tokens(&self, request: &ChatRequest) -> u32 {
183        if request.max_tokens_explicit {
184            request.max_tokens
185        } else {
186            self.default_max_tokens()
187        }
188    }
189
190    fn map_claude_response(api_response: anthropic_data::ApiResponse) -> ChatResponse {
191        let content = anthropic_data::map_content_blocks(api_response.content);
192        let stop_reason = api_response
193            .stop_reason
194            .as_ref()
195            .map(anthropic_data::map_stop_reason);
196
197        ChatResponse {
198            id: api_response.id,
199            content,
200            model: api_response.model,
201            stop_reason,
202            usage: Usage {
203                input_tokens: api_response.usage.total_input_tokens(),
204                output_tokens: api_response.usage.output,
205                cached_input_tokens: api_response.usage.cached_input_tokens(),
206                cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
207            },
208        }
209    }
210}
211
212#[async_trait]
213impl LlmProvider for VertexProvider {
214    async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
215        if self.is_claude_model() {
216            return self.chat_claude(request).await;
217        }
218        self.chat_gemini(request).await
219    }
220
221    fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
222        if self.is_claude_model() {
223            return self.chat_stream_claude(request);
224        }
225        self.chat_stream_gemini(request)
226    }
227
228    fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
229        let Some(thinking) = thinking else {
230            return Ok(());
231        };
232
233        if self
234            .capabilities()
235            .is_some_and(|caps| !caps.supports_thinking)
236        {
237            return Err(anyhow::anyhow!(
238                "thinking is not supported for provider={} model={}",
239                self.provider(),
240                self.model()
241            ));
242        }
243
244        if matches!(thinking.mode, ThinkingMode::Adaptive)
245            && !self
246                .capabilities()
247                .is_some_and(|caps| caps.supports_adaptive_thinking)
248        {
249            return Err(anyhow::anyhow!(
250                "adaptive thinking is not supported for provider={} model={}",
251                self.provider(),
252                self.model()
253            ));
254        }
255
256        if self.is_claude_model()
257            && self.requires_anthropic_adaptive_thinking()
258            && matches!(thinking.mode, ThinkingMode::Enabled { .. })
259        {
260            return Err(anyhow::anyhow!(
261                "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
262                self.provider(),
263                self.model()
264            ));
265        }
266
267        Ok(())
268    }
269
270    fn model(&self) -> &str {
271        &self.model
272    }
273
274    fn provider(&self) -> &'static str {
275        "vertex"
276    }
277
278    fn configured_thinking(&self) -> Option<&ThinkingConfig> {
279        self.thinking.as_ref()
280    }
281
282    fn default_max_tokens(&self) -> u32 {
283        let provider = if self.is_claude_model() {
284            "anthropic"
285        } else {
286            "gemini"
287        };
288        let model_max = self
289            .capabilities()
290            .and_then(|caps| caps.max_output_tokens)
291            .or_else(|| {
292                crate::model_capabilities::default_max_output_tokens(provider, self.model())
293            })
294            .unwrap_or(4096);
295        model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
296    }
297}
298
299// ============================================================================
300// Gemini path (publishers/google)
301// ============================================================================
302
303impl VertexProvider {
304    #[allow(clippy::too_many_lines)]
305    async fn chat_gemini(&self, request: ChatRequest) -> Result<ChatOutcome> {
306        let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
307            Ok(thinking) => thinking,
308            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
309        };
310        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
311            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
312        }
313        let contents = build_api_contents(&request.messages);
314        let tools = request
315            .tools
316            .as_ref()
317            .map(|t| convert_tools_to_config(t.clone()));
318        let tool_config = request
319            .tool_choice
320            .as_ref()
321            .map(ApiFunctionCallingConfig::from_tool_choice);
322        let system_instruction = if request.system.is_empty() {
323            None
324        } else {
325            Some(ApiContent {
326                role: None,
327                parts: vec![ApiPart::Text {
328                    text: request.system.clone(),
329                    thought_signature: None,
330                }],
331            })
332        };
333
334        let thinking_config = thinking.as_ref().map(map_thinking_config);
335        let (response_mime_type, response_schema) =
336            request.response_format.as_ref().map_or((None, None), |rf| {
337                (
338                    Some("application/json"),
339                    Some(gemini_response_schema(&rf.schema)),
340                )
341            });
342
343        let max_tokens = self.effective_max_tokens(&request);
344        let api_request = ApiGenerateContentRequest {
345            contents: &contents,
346            system_instruction: system_instruction.as_ref(),
347            tools: tools.as_ref().map(std::slice::from_ref),
348            tool_config,
349            generation_config: Some(ApiGenerationConfig {
350                max_output_tokens: Some(max_tokens),
351                thinking_config,
352                response_mime_type,
353                response_schema,
354            }),
355            cached_content: request.cached_content.as_deref(),
356        };
357
358        log::debug!(
359            "Vertex AI LLM request model={} max_tokens={}",
360            self.model,
361            max_tokens
362        );
363
364        let url = format!("{}:generateContent", self.base_url("google"));
365
366        let response = self
367            .client
368            .post(&url)
369            .header("Content-Type", "application/json")
370            .timeout(std::time::Duration::from_secs(CHAT_READ_TIMEOUT_SECS))
371            .bearer_auth(&self.access_token)
372            .json(&api_request)
373            .send()
374            .await
375            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
376
377        let status = response.status();
378        let bytes = response
379            .bytes()
380            .await
381            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
382
383        log::debug!(
384            "Vertex AI LLM response status={} body_len={}",
385            status,
386            bytes.len()
387        );
388
389        if status == StatusCode::TOO_MANY_REQUESTS {
390            return Ok(ChatOutcome::RateLimited);
391        }
392
393        if status.is_server_error() {
394            let body = String::from_utf8_lossy(&bytes);
395            log::error!("Vertex AI server error status={status} body={body}");
396            return Ok(ChatOutcome::ServerError(body.into_owned()));
397        }
398
399        if status.is_client_error() {
400            let body = String::from_utf8_lossy(&bytes);
401            log::warn!("Vertex AI client error status={status} body={body}");
402            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
403        }
404
405        let api_response: ApiGenerateContentResponse = serde_json::from_slice(&bytes)
406            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
407
408        let candidate = api_response
409            .candidates
410            .into_iter()
411            .next()
412            .ok_or_else(|| anyhow::anyhow!("no candidates in response"))?;
413
414        let content = build_content_blocks(&candidate.content);
415
416        if content.is_empty() && !candidate.content.parts.is_empty() {
417            log::warn!(
418                "Vertex AI parts not converted to content blocks raw_parts={:?}",
419                candidate.content.parts
420            );
421        }
422
423        let has_tool_calls = content
424            .iter()
425            .any(|b| matches!(b, agent_sdk_foundation::llm::ContentBlock::ToolUse { .. }));
426
427        let stop_reason = candidate
428            .finish_reason
429            .as_ref()
430            .map(|r| map_finish_reason(r, has_tool_calls));
431
432        let usage = api_response
433            .usage_metadata
434            .unwrap_or(ApiUsageMetadata {
435                prompt: 0,
436                candidates: 0,
437                cached_content: 0,
438            })
439            .into_usage();
440
441        Ok(ChatOutcome::Success(ChatResponse {
442            id: String::new(),
443            content,
444            model: self.model.clone(),
445            stop_reason,
446            usage,
447        }))
448    }
449
450    fn chat_stream_gemini(&self, request: ChatRequest) -> StreamBox<'_> {
451        Box::pin(async_stream::stream! {
452            let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
453                Ok(thinking) => thinking,
454                Err(error) => {
455                    yield Ok(StreamDelta::Error {
456                        message: error.to_string(),
457                        kind: StreamErrorKind::InvalidRequest,
458                    });
459                    return;
460                }
461            };
462            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
463                yield Ok(StreamDelta::Error {
464                    message: error.to_string(),
465                    kind: StreamErrorKind::InvalidRequest,
466                });
467                return;
468            }
469
470            let contents = build_api_contents(&request.messages);
471            let tools = request
472                .tools
473                .as_ref()
474                .map(|t| convert_tools_to_config(t.clone()));
475            let tool_config = request
476                .tool_choice
477                .as_ref()
478                .map(ApiFunctionCallingConfig::from_tool_choice);
479            let system_instruction = build_gemini_system_instruction(&request.system);
480            let thinking_config = thinking.as_ref().map(map_thinking_config);
481            let (response_mime_type, response_schema) =
482                gemini_response_format(request.response_format.as_ref());
483
484            let max_tokens = self.effective_max_tokens(&request);
485            let api_request = ApiGenerateContentRequest {
486                contents: &contents,
487                system_instruction: system_instruction.as_ref(),
488                tools: tools.as_ref().map(std::slice::from_ref),
489                tool_config,
490                generation_config: Some(ApiGenerationConfig {
491                    max_output_tokens: Some(max_tokens),
492                    thinking_config,
493                    response_mime_type,
494                    response_schema,
495                }),
496                cached_content: request.cached_content.as_deref(),
497            };
498
499            log::debug!(
500                "Vertex AI streaming LLM request model={} max_tokens={}",
501                self.model,
502                max_tokens
503            );
504
505            let url = format!("{}:streamGenerateContent?alt=sse", self.base_url("google"));
506
507            let response = match self.send_gemini_stream_request(&url, &api_request).await {
508                Ok(response) => response,
509                Err(item) => {
510                    yield item;
511                    return;
512                }
513            };
514
515            let mut inner = stream_gemini_response(response);
516            while let Some(item) = futures::StreamExt::next(&mut inner).await {
517                yield item;
518            }
519        })
520    }
521
522    /// Issue the Vertex Gemini streaming request, returning the raw response on
523    /// success or a ready-to-`yield` stream item describing the failure.
524    ///
525    /// The `Err` payload is the exact `StreamBox` item to `yield`: an
526    /// `anyhow::Error` for a transport failure, or a classified
527    /// [`StreamDelta::Error`] for a non-success HTTP status. This keeps the
528    /// generator's failure handling to a single `yield`.
529    async fn send_gemini_stream_request(
530        &self,
531        url: &str,
532        api_request: &ApiGenerateContentRequest<'_>,
533    ) -> Result<reqwest::Response, anyhow::Result<StreamDelta>> {
534        let response = match self
535            .client
536            .post(url)
537            .header("Content-Type", "application/json")
538            .bearer_auth(&self.access_token)
539            .json(api_request)
540            .send()
541            .await
542        {
543            Ok(response) => response,
544            // Include the cause so 401 detection / diagnostics survive.
545            Err(e) => return Err(Err(anyhow::anyhow!("request failed: {e}"))),
546        };
547
548        let status = response.status();
549        if !status.is_success() {
550            let body = response.text().await.unwrap_or_default();
551            let kind = if status == StatusCode::TOO_MANY_REQUESTS {
552                StreamErrorKind::RateLimited
553            } else if status.is_server_error() {
554                StreamErrorKind::ServerError
555            } else {
556                StreamErrorKind::InvalidRequest
557            };
558            log::warn!("Vertex AI error status={status} body={body}");
559            return Err(Ok(StreamDelta::Error {
560                message: body,
561                kind,
562            }));
563        }
564
565        Ok(response)
566    }
567}
568
569/// Build the Gemini `system_instruction` content from the request system prompt,
570/// or `None` when the prompt is empty.
571fn build_gemini_system_instruction(system: &str) -> Option<ApiContent> {
572    if system.is_empty() {
573        None
574    } else {
575        Some(ApiContent {
576            role: None,
577            parts: vec![ApiPart::Text {
578                text: system.to_owned(),
579                thought_signature: None,
580            }],
581        })
582    }
583}
584
585/// Map an optional response format into Gemini's `(responseMimeType,
586/// responseSchema)` pair, sanitizing the schema to the subset Gemini accepts.
587fn gemini_response_format(
588    response_format: Option<&ResponseFormat>,
589) -> (Option<&'static str>, Option<serde_json::Value>) {
590    response_format.map_or((None, None), |rf| {
591        (
592            Some("application/json"),
593            Some(gemini_response_schema(&rf.schema)),
594        )
595    })
596}
597
598// ============================================================================
599// Claude path (publishers/anthropic)
600// ============================================================================
601
602impl VertexProvider {
603    async fn chat_claude(&self, request: ChatRequest) -> Result<ChatOutcome> {
604        let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
605            Ok(thinking) => thinking,
606            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
607        };
608        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
609            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
610        }
611        let messages = Self::build_cached_vertex_claude_messages(&request);
612        let tools = anthropic_data::build_api_tools(&request);
613        let thinking = thinking_config
614            .as_ref()
615            .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
616        let output_config = thinking_config
617            .as_ref()
618            .and_then(|t| t.effort)
619            .map(|effort| anthropic_data::ApiOutputConfig { effort });
620        let system = Self::build_vertex_claude_system_prompt(&request.system);
621        let tool_choice = request
622            .tool_choice
623            .as_ref()
624            .map(anthropic_data::ApiToolChoice::from_tool_choice);
625
626        let max_tokens = self.effective_max_tokens(&request);
627        let api_request = anthropic_data::ApiMessagesRequest {
628            model: None, // model is in the URL for Vertex
629            max_tokens,
630            system,
631            messages: &messages,
632            tools: tools.as_deref(),
633            tool_choice,
634            stream: false,
635            thinking,
636            output_config,
637            anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
638        };
639
640        log::debug!(
641            "Vertex AI (Claude) LLM request model={} max_tokens={}",
642            self.model,
643            max_tokens
644        );
645
646        if log::log_enabled!(log::Level::Debug) {
647            match serde_json::to_string_pretty(&api_request) {
648                Ok(json) => log::debug!("Vertex AI (Claude) request payload:\n{json}"),
649                Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
650            }
651        }
652
653        let url = format!("{}:rawPredict", self.base_url("anthropic"));
654
655        let response = self
656            .client
657            .post(&url)
658            .header("Content-Type", "application/json")
659            .timeout(std::time::Duration::from_secs(CHAT_READ_TIMEOUT_SECS))
660            .bearer_auth(&self.access_token)
661            .json(&api_request)
662            .send()
663            .await
664            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
665
666        let status = response.status();
667        let bytes = response
668            .bytes()
669            .await
670            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
671
672        log::debug!(
673            "Vertex AI (Claude) response status={} body_len={}",
674            status,
675            bytes.len()
676        );
677
678        if status == StatusCode::TOO_MANY_REQUESTS {
679            return Ok(ChatOutcome::RateLimited);
680        }
681
682        if status.is_server_error() {
683            let body = String::from_utf8_lossy(&bytes);
684            log::error!("Vertex AI (Claude) server error status={status} body={body}");
685            return Ok(ChatOutcome::ServerError(body.into_owned()));
686        }
687
688        if status.is_client_error() {
689            let body = String::from_utf8_lossy(&bytes);
690            log::warn!("Vertex AI (Claude) client error status={status} body={body}");
691            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
692        }
693
694        let api_response: anthropic_data::ApiResponse = serde_json::from_slice(&bytes)
695            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
696
697        log::debug!(
698            "Vertex AI (Claude) response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
699            api_response.id,
700            api_response.model,
701            api_response.stop_reason,
702            api_response.usage.total_input_tokens(),
703            api_response.usage.output,
704            api_response.content.len()
705        );
706
707        Ok(ChatOutcome::Success(Self::map_claude_response(
708            api_response,
709        )))
710    }
711
712    #[allow(clippy::too_many_lines)]
713    fn chat_stream_claude(&self, request: ChatRequest) -> StreamBox<'_> {
714        Box::pin(async_stream::stream! {
715            let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
716                Ok(thinking) => thinking,
717                Err(error) => {
718                    yield Ok(StreamDelta::Error {
719                        message: error.to_string(),
720                        kind: StreamErrorKind::InvalidRequest,
721                    });
722                    return;
723                }
724            };
725            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
726                yield Ok(StreamDelta::Error {
727                    message: error.to_string(),
728                    kind: StreamErrorKind::InvalidRequest,
729                });
730                return;
731            }
732            let messages = Self::build_cached_vertex_claude_messages(&request);
733            let tools = anthropic_data::build_api_tools(&request);
734            let thinking = thinking_config
735                .as_ref()
736                .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
737            let output_config = thinking_config
738                .as_ref()
739                .and_then(|t| t.effort)
740                .map(|effort| anthropic_data::ApiOutputConfig { effort });
741            let system = Self::build_vertex_claude_system_prompt(&request.system);
742            let tool_choice = request
743                .tool_choice
744                .as_ref()
745                .map(anthropic_data::ApiToolChoice::from_tool_choice);
746
747            let max_tokens = self.effective_max_tokens(&request);
748            let api_request = anthropic_data::ApiMessagesRequest {
749                model: None, // model is in the URL for Vertex
750                max_tokens,
751                system,
752                messages: &messages,
753                tools: tools.as_deref(),
754                tool_choice,
755                stream: true,
756                thinking,
757                output_config,
758                anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
759            };
760
761            log::debug!(
762                "Vertex AI (Claude) streaming request model={} max_tokens={}",
763                self.model,
764                max_tokens
765            );
766
767            if log::log_enabled!(log::Level::Debug) {
768                match serde_json::to_string_pretty(&api_request) {
769                    Ok(json) => log::debug!("Vertex AI (Claude) streaming request payload:\n{json}"),
770                    Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
771                }
772            }
773
774            let url = format!("{}:streamRawPredict", self.base_url("anthropic"));
775
776            let response = match self
777                .client
778                .post(&url)
779                .header("Content-Type", "application/json")
780                .bearer_auth(&self.access_token)
781                .json(&api_request)
782                .send()
783                .await
784            {
785                Ok(r) => r,
786                Err(e) => {
787                    yield Err(anyhow::anyhow!("request failed: {e}"));
788                    return;
789                }
790            };
791
792            let status = response.status();
793
794            if status == StatusCode::TOO_MANY_REQUESTS {
795                yield Ok(StreamDelta::Error {
796                    message: "Rate limited".to_string(),
797                    kind: StreamErrorKind::RateLimited,
798                });
799                return;
800            }
801
802            if status.is_server_error() {
803                let body = response.text().await.unwrap_or_default();
804                log::error!("Vertex AI (Claude) server error status={status} body={body}");
805                yield Ok(StreamDelta::Error {
806                    message: body,
807                    kind: StreamErrorKind::ServerError,
808                });
809                return;
810            }
811
812            if status.is_client_error() {
813                let body = response.text().await.unwrap_or_default();
814                log::warn!("Vertex AI (Claude) client error status={status} body={body}");
815                yield Ok(StreamDelta::Error {
816                    message: body,
817                    kind: StreamErrorKind::InvalidRequest,
818                });
819                return;
820            }
821
822            // Process SSE stream using the Anthropic SSE parser
823            let mut stream = response.bytes_stream();
824            let mut buffer = String::new();
825            let mut input_tokens: u32 = 0;
826            let mut output_tokens: u32 = 0;
827            let mut cached_input_tokens: u32 = 0;
828            let mut cache_creation_input_tokens: u32 = 0;
829            let mut tool_ids: std::collections::HashMap<usize, String> =
830                std::collections::HashMap::new();
831            let mut received_message_stop = false;
832            let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
833
834            while let Some(chunk_result) = stream.next().await {
835                let chunk = match chunk_result {
836                    Ok(c) => c,
837                    Err(e) => {
838                        // Include the cause so 401 detection / diagnostics survive.
839                        yield Err(anyhow::anyhow!("stream error: {e}"));
840                        return;
841                    }
842                };
843
844                buffer.push_str(&String::from_utf8_lossy(&chunk));
845
846                // Process complete SSE events (terminated by a blank line)
847                while let Some(event_block) = anthropic_data::take_next_sse_event(&mut buffer) {
848                    if anthropic_data::is_message_stop_event(&event_block) {
849                        received_message_stop = true;
850                    }
851
852                    if let Some(delta) = anthropic_data::parse_sse_event(
853                        &event_block,
854                        &mut input_tokens,
855                        &mut output_tokens,
856                        &mut cached_input_tokens,
857                        &mut cache_creation_input_tokens,
858                        &mut tool_ids,
859                        &mut pending_stop_reason,
860                    ) {
861                        yield Ok(delta);
862                    }
863                    if anthropic_data::is_message_stop_event(&event_block) {
864                        yield Ok(StreamDelta::Done {
865                            stop_reason: pending_stop_reason.take(),
866                        });
867                    }
868                }
869            }
870
871            // Process remaining buffer
872            let remaining = buffer.trim();
873            if !remaining.is_empty() {
874                if anthropic_data::is_message_stop_event(remaining) {
875                    received_message_stop = true;
876                }
877
878                if let Some(delta) = anthropic_data::parse_sse_event(
879                    remaining,
880                    &mut input_tokens,
881                    &mut output_tokens,
882                    &mut cached_input_tokens,
883                    &mut cache_creation_input_tokens,
884                    &mut tool_ids,
885                    &mut pending_stop_reason,
886                ) {
887                    yield Ok(delta);
888                }
889                if anthropic_data::is_message_stop_event(remaining) {
890                    yield Ok(StreamDelta::Done {
891                        stop_reason: pending_stop_reason.take(),
892                    });
893                }
894            }
895
896            if !received_message_stop {
897                log::warn!(
898                    "Vertex AI (Claude) SSE stream ended without message_stop"
899                );
900                yield Ok(StreamDelta::Error {
901                    message: "Stream ended unexpectedly without completion".to_string(),
902                    kind: StreamErrorKind::ServerError,
903                });
904            }
905        })
906    }
907}
908
909#[cfg(test)]
910mod tests {
911    use super::*;
912
913    #[test]
914    fn test_new_creates_provider() {
915        let provider = VertexProvider::new(
916            "token".to_string(),
917            "my-project".to_string(),
918            "us-central1".to_string(),
919            "custom-model".to_string(),
920        );
921
922        assert_eq!(provider.model(), "custom-model");
923        assert_eq!(provider.provider(), "vertex");
924    }
925
926    #[test]
927    fn test_flash_factory() {
928        let provider = VertexProvider::flash(
929            "token".to_string(),
930            "my-project".to_string(),
931            "us-central1".to_string(),
932        );
933
934        assert_eq!(provider.model(), MODEL_GEMINI_3_FLASH);
935        assert_eq!(provider.provider(), "vertex");
936    }
937
938    #[test]
939    fn test_pro_factory() {
940        let provider = VertexProvider::pro(
941            "token".to_string(),
942            "my-project".to_string(),
943            "us-central1".to_string(),
944        );
945
946        assert_eq!(provider.model(), MODEL_GEMINI_31_PRO);
947        assert_eq!(provider.provider(), "vertex");
948    }
949
950    #[test]
951    fn test_provider_is_cloneable() {
952        let provider = VertexProvider::new(
953            "token".to_string(),
954            "my-project".to_string(),
955            "us-central1".to_string(),
956            "test-model".to_string(),
957        );
958        let cloned = provider.clone();
959
960        assert_eq!(provider.model(), cloned.model());
961        assert_eq!(provider.provider(), cloned.provider());
962    }
963
964    #[test]
965    fn test_is_claude_model() {
966        let claude_provider = VertexProvider::new(
967            "token".to_string(),
968            "project".to_string(),
969            "us-central1".to_string(),
970            "claude-sonnet-4-20250514".to_string(),
971        );
972        assert!(claude_provider.is_claude_model());
973
974        let gemini_provider = VertexProvider::new(
975            "token".to_string(),
976            "project".to_string(),
977            "us-central1".to_string(),
978            "gemini-3-flash-preview".to_string(),
979        );
980        assert!(!gemini_provider.is_claude_model());
981    }
982
983    #[test]
984    fn test_base_url_gemini() {
985        let provider = VertexProvider::new(
986            "token".to_string(),
987            "my-project".to_string(),
988            "us-central1".to_string(),
989            "gemini-3-flash-preview".to_string(),
990        );
991
992        let url = provider.base_url("google");
993        assert_eq!(
994            url,
995            "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google/models/gemini-3-flash-preview"
996        );
997    }
998
999    #[test]
1000    fn test_base_url_claude() {
1001        let provider = VertexProvider::new(
1002            "token".to_string(),
1003            "my-project".to_string(),
1004            "us-central1".to_string(),
1005            "claude-sonnet-4-20250514".to_string(),
1006        );
1007
1008        let url = provider.base_url("anthropic");
1009        assert_eq!(
1010            url,
1011            "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/anthropic/models/claude-sonnet-4-20250514"
1012        );
1013    }
1014
1015    #[test]
1016    fn test_base_url_with_different_region() {
1017        let provider = VertexProvider::new(
1018            "token".to_string(),
1019            "other-project".to_string(),
1020            "europe-west4".to_string(),
1021            "gemini-3.1-pro-preview".to_string(),
1022        );
1023
1024        let url = provider.base_url("google");
1025        assert!(url.starts_with("https://europe-west4-aiplatform.googleapis.com/"));
1026        assert!(url.contains("/projects/other-project/"));
1027        assert!(url.contains("/locations/europe-west4/"));
1028        assert!(url.ends_with("/models/gemini-3.1-pro-preview"));
1029    }
1030
1031    #[test]
1032    fn test_base_url_global_region_has_no_prefix() {
1033        let provider = VertexProvider::new(
1034            "token".to_string(),
1035            "my-project".to_string(),
1036            "global".to_string(),
1037            "gemini-3.1-pro-preview".to_string(),
1038        );
1039
1040        let url = provider.base_url("google");
1041        assert_eq!(
1042            url,
1043            "https://aiplatform.googleapis.com/v1/projects/my-project/locations/global/publishers/google/models/gemini-3.1-pro-preview"
1044        );
1045    }
1046
1047    #[test]
1048    fn test_vertex_claude_46_rejects_budgeted_thinking() {
1049        let provider = VertexProvider::new(
1050            "token".to_string(),
1051            "project".to_string(),
1052            "global".to_string(),
1053            MODEL_SONNET_46.to_string(),
1054        );
1055
1056        let error = provider
1057            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1058            .unwrap_err();
1059        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1060    }
1061
1062    #[test]
1063    fn test_vertex_claude_opus_47_rejects_budgeted_thinking() {
1064        let provider = VertexProvider::new(
1065            "token".to_string(),
1066            "project".to_string(),
1067            "global".to_string(),
1068            MODEL_OPUS_47.to_string(),
1069        );
1070
1071        let error = provider
1072            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1073            .unwrap_err();
1074        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1075    }
1076
1077    #[test]
1078    fn test_vertex_claude_opus_48_rejects_budgeted_thinking() {
1079        let provider = VertexProvider::new(
1080            "token".to_string(),
1081            "project".to_string(),
1082            "global".to_string(),
1083            MODEL_OPUS_48.to_string(),
1084        );
1085
1086        let error = provider
1087            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1088            .unwrap_err();
1089        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1090    }
1091
1092    #[test]
1093    fn test_vertex_claude_fable_5_rejects_budgeted_thinking() {
1094        let provider = VertexProvider::new(
1095            "token".to_string(),
1096            "project".to_string(),
1097            "global".to_string(),
1098            MODEL_FABLE_5.to_string(),
1099        );
1100
1101        let error = provider
1102            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1103            .unwrap_err();
1104        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1105    }
1106
1107    #[test]
1108    fn test_model_constants() {
1109        assert_eq!(MODEL_GEMINI_3_FLASH, "gemini-3-flash-preview");
1110        assert_eq!(MODEL_GEMINI_31_PRO, "gemini-3.1-pro-preview");
1111        assert_eq!(MODEL_GEMINI_3_PRO, "gemini-3.0-pro");
1112    }
1113
1114    fn request_with_max_tokens(max_tokens: u32, explicit: bool) -> ChatRequest {
1115        ChatRequest {
1116            system: String::new(),
1117            messages: vec![agent_sdk_foundation::llm::Message::user("hi")],
1118            tools: None,
1119            max_tokens,
1120            max_tokens_explicit: explicit,
1121            session_id: None,
1122            cached_content: None,
1123            thinking: None,
1124            tool_choice: None,
1125            response_format: None,
1126        }
1127    }
1128
1129    #[test]
1130    fn test_effective_max_tokens_honors_explicit_budget() {
1131        let provider = VertexProvider::new(
1132            "token".to_string(),
1133            "project".to_string(),
1134            "global".to_string(),
1135            MODEL_SONNET_46.to_string(),
1136        );
1137        let request = request_with_max_tokens(1234, true);
1138        assert_eq!(provider.effective_max_tokens(&request), 1234);
1139    }
1140
1141    #[test]
1142    fn test_effective_max_tokens_uses_clamped_default_when_implicit() {
1143        // An implicit budget for a Claude-on-Vertex model must fall back to the
1144        // clamped default (<= 32k), not the unclamped capability ceiling and
1145        // not ChatRequest::DEFAULT_MAX_TOKENS.
1146        let provider = VertexProvider::new(
1147            "token".to_string(),
1148            "project".to_string(),
1149            "global".to_string(),
1150            MODEL_SONNET_46.to_string(),
1151        );
1152        let request = request_with_max_tokens(4096, false);
1153        let effective = provider.effective_max_tokens(&request);
1154        assert_eq!(effective, provider.default_max_tokens());
1155        assert!(effective <= DEFAULT_SAFE_MAX_OUTPUT_TOKENS);
1156    }
1157}