Skip to main content

agent_sdk_providers/impls/
vertex.rs

1//! Google Vertex AI provider implementation.
2//!
3//! This module provides an implementation of `LlmProvider` for the Google Vertex AI
4//! platform. It supports both Gemini models (using the Gemini API format) and
5//! Claude models (using the Anthropic Messages API format via `rawPredict`).
6//!
7//! Publisher detection is automatic based on the model name:
8//! - `claude-*` models route to `publishers/anthropic` using `rawPredict`
9//! - All other models route to `publishers/google` using `generateContent`
10
11use crate::attachments::validate_request_attachments;
12use crate::impls::anthropic::{
13    MODEL_FABLE_5, MODEL_OPUS_46, MODEL_OPUS_47, MODEL_OPUS_48, MODEL_SONNET_46,
14    data as anthropic_data,
15};
16use crate::impls::gemini::data::{
17    ApiContent, ApiFunctionCallingConfig, ApiGenerateContentRequest, ApiGenerateContentResponse,
18    ApiGenerationConfig, ApiPart, ApiUsageMetadata, build_api_contents, build_content_blocks,
19    convert_tools_to_config, gemini_response_schema, map_finish_reason, map_thinking_config,
20    stream_gemini_response,
21};
22use crate::provider::LlmProvider;
23use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
24use agent_sdk_foundation::llm::{
25    ChatOutcome, ChatRequest, ChatResponse, ThinkingConfig, ThinkingMode, Usage,
26};
27use anyhow::Result;
28use async_trait::async_trait;
29use futures::StreamExt;
30use reqwest::StatusCode;
31
32pub const MODEL_GEMINI_3_FLASH: &str = "gemini-3-flash-preview";
33pub const MODEL_GEMINI_31_PRO: &str = "gemini-3.1-pro-preview";
34
35// Legacy Gemini 3.0 Pro model kept for explicit opt-in.
36pub const MODEL_GEMINI_3_PRO: &str = "gemini-3.0-pro";
37
38/// The Anthropic API version used for Claude models on Vertex AI.
39const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
40const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
41
42const fn vertex_cache_control() -> anthropic_data::ApiCacheControl {
43    anthropic_data::ApiCacheControl::ephemeral()
44}
45
46/// Google Vertex AI LLM provider.
47///
48/// Uses the same Gemini request/response format as `GeminiProvider` but
49/// authenticates via `OAuth2` Bearer tokens and routes through the Vertex AI
50/// regional endpoint.
51///
52/// Claude models are also supported — the provider detects the publisher from
53/// the model name and uses the appropriate API format automatically.
54#[derive(Clone)]
55pub struct VertexProvider {
56    client: reqwest::Client,
57    access_token: String,
58    project_id: String,
59    region: String,
60    model: String,
61    thinking: Option<ThinkingConfig>,
62}
63
64impl VertexProvider {
65    /// Create a new Vertex AI provider with full control over all parameters.
66    #[must_use]
67    pub fn new(access_token: String, project_id: String, region: String, model: String) -> Self {
68        let client = reqwest::Client::builder()
69            .connect_timeout(std::time::Duration::from_secs(30))
70            .tcp_keepalive(std::time::Duration::from_secs(30))
71            .build()
72            .unwrap_or_default();
73
74        Self {
75            client,
76            access_token,
77            project_id,
78            region,
79            model,
80            thinking: None,
81        }
82    }
83
84    /// Create a provider using Gemini 3 Flash Preview on Vertex AI.
85    #[must_use]
86    pub fn flash(access_token: String, project_id: String, region: String) -> Self {
87        Self::new(
88            access_token,
89            project_id,
90            region,
91            MODEL_GEMINI_3_FLASH.to_owned(),
92        )
93    }
94
95    /// Create a provider using Gemini 3.1 Pro Preview on Vertex AI.
96    #[must_use]
97    pub fn pro(access_token: String, project_id: String, region: String) -> Self {
98        Self::new(
99            access_token,
100            project_id,
101            region,
102            MODEL_GEMINI_31_PRO.to_owned(),
103        )
104    }
105
106    /// Detect whether the model is a Claude model (Anthropic publisher).
107    fn is_claude_model(&self) -> bool {
108        self.model.starts_with("claude-")
109    }
110
111    /// Build the base URL for the given publisher and model.
112    ///
113    /// For the `global` location the domain is `aiplatform.googleapis.com`
114    /// (no region prefix). Regional locations use `{region}-aiplatform.googleapis.com`.
115    fn base_url(&self, publisher: &str) -> String {
116        let domain = if self.region == "global" {
117            "aiplatform.googleapis.com".to_owned()
118        } else {
119            format!("{}-aiplatform.googleapis.com", self.region)
120        };
121        format!(
122            "https://{domain}/v1/projects/{project}/locations/{region}/publishers/{publisher}/models/{model}",
123            domain = domain,
124            region = self.region,
125            project = self.project_id,
126            publisher = publisher,
127            model = self.model,
128        )
129    }
130
131    /// Set the provider-owned thinking configuration for this model.
132    #[must_use]
133    pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
134        self.thinking = Some(thinking);
135        self
136    }
137
138    fn requires_anthropic_adaptive_thinking(&self) -> bool {
139        matches!(
140            self.model.as_str(),
141            MODEL_SONNET_46 | MODEL_OPUS_46 | MODEL_OPUS_47 | MODEL_OPUS_48 | MODEL_FABLE_5
142        )
143    }
144
145    fn build_cached_vertex_claude_messages(
146        request: &ChatRequest,
147    ) -> Vec<anthropic_data::ApiMessage> {
148        let mut messages = anthropic_data::build_api_messages(request);
149        anthropic_data::apply_cache_control_to_last_user_message(
150            &mut messages,
151            vertex_cache_control(),
152        );
153        messages
154    }
155
156    fn build_vertex_claude_system_prompt(
157        system: &str,
158    ) -> Option<anthropic_data::ApiSystemPrompt<'_>> {
159        anthropic_data::build_api_system_prompt(system, Some(vertex_cache_control()))
160    }
161
162    fn map_claude_response(api_response: anthropic_data::ApiResponse) -> ChatResponse {
163        let content = anthropic_data::map_content_blocks(api_response.content);
164        let stop_reason = api_response
165            .stop_reason
166            .as_ref()
167            .map(anthropic_data::map_stop_reason);
168
169        ChatResponse {
170            id: api_response.id,
171            content,
172            model: api_response.model,
173            stop_reason,
174            usage: Usage {
175                input_tokens: api_response.usage.total_input_tokens(),
176                output_tokens: api_response.usage.output,
177                cached_input_tokens: api_response.usage.cached_input_tokens(),
178                cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
179            },
180        }
181    }
182}
183
184#[async_trait]
185impl LlmProvider for VertexProvider {
186    async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
187        if self.is_claude_model() {
188            return self.chat_claude(request).await;
189        }
190        self.chat_gemini(request).await
191    }
192
193    fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
194        if self.is_claude_model() {
195            return self.chat_stream_claude(request);
196        }
197        self.chat_stream_gemini(request)
198    }
199
200    fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
201        let Some(thinking) = thinking else {
202            return Ok(());
203        };
204
205        if self
206            .capabilities()
207            .is_some_and(|caps| !caps.supports_thinking)
208        {
209            return Err(anyhow::anyhow!(
210                "thinking is not supported for provider={} model={}",
211                self.provider(),
212                self.model()
213            ));
214        }
215
216        if matches!(thinking.mode, ThinkingMode::Adaptive)
217            && !self
218                .capabilities()
219                .is_some_and(|caps| caps.supports_adaptive_thinking)
220        {
221            return Err(anyhow::anyhow!(
222                "adaptive thinking is not supported for provider={} model={}",
223                self.provider(),
224                self.model()
225            ));
226        }
227
228        if self.is_claude_model()
229            && self.requires_anthropic_adaptive_thinking()
230            && matches!(thinking.mode, ThinkingMode::Enabled { .. })
231        {
232            return Err(anyhow::anyhow!(
233                "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
234                self.provider(),
235                self.model()
236            ));
237        }
238
239        Ok(())
240    }
241
242    fn model(&self) -> &str {
243        &self.model
244    }
245
246    fn provider(&self) -> &'static str {
247        "vertex"
248    }
249
250    fn configured_thinking(&self) -> Option<&ThinkingConfig> {
251        self.thinking.as_ref()
252    }
253
254    fn default_max_tokens(&self) -> u32 {
255        let provider = if self.is_claude_model() {
256            "anthropic"
257        } else {
258            "gemini"
259        };
260        let model_max = self
261            .capabilities()
262            .and_then(|caps| caps.max_output_tokens)
263            .or_else(|| {
264                crate::model_capabilities::default_max_output_tokens(provider, self.model())
265            })
266            .unwrap_or(4096);
267        model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
268    }
269}
270
271// ============================================================================
272// Gemini path (publishers/google)
273// ============================================================================
274
275impl VertexProvider {
276    #[allow(clippy::too_many_lines)]
277    async fn chat_gemini(&self, request: ChatRequest) -> Result<ChatOutcome> {
278        let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
279            Ok(thinking) => thinking,
280            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
281        };
282        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
283            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
284        }
285        let contents = build_api_contents(&request.messages);
286        let tools = request.tools.map(convert_tools_to_config);
287        let tool_config = request
288            .tool_choice
289            .as_ref()
290            .map(ApiFunctionCallingConfig::from_tool_choice);
291        let system_instruction = if request.system.is_empty() {
292            None
293        } else {
294            Some(ApiContent {
295                role: None,
296                parts: vec![ApiPart::Text {
297                    text: request.system.clone(),
298                    thought_signature: None,
299                }],
300            })
301        };
302
303        let thinking_config = thinking.as_ref().map(map_thinking_config);
304        let (response_mime_type, response_schema) =
305            request.response_format.as_ref().map_or((None, None), |rf| {
306                (
307                    Some("application/json"),
308                    Some(gemini_response_schema(&rf.schema)),
309                )
310            });
311
312        let api_request = ApiGenerateContentRequest {
313            contents: &contents,
314            system_instruction: system_instruction.as_ref(),
315            tools: tools.as_ref().map(std::slice::from_ref),
316            tool_config,
317            generation_config: Some(ApiGenerationConfig {
318                max_output_tokens: Some(request.max_tokens),
319                thinking_config,
320                response_mime_type,
321                response_schema,
322            }),
323            cached_content: request.cached_content.as_deref(),
324        };
325
326        log::debug!(
327            "Vertex AI LLM request model={} max_tokens={}",
328            self.model,
329            request.max_tokens
330        );
331
332        let url = format!("{}:generateContent", self.base_url("google"));
333
334        let response = self
335            .client
336            .post(&url)
337            .header("Content-Type", "application/json")
338            .bearer_auth(&self.access_token)
339            .json(&api_request)
340            .send()
341            .await
342            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
343
344        let status = response.status();
345        let bytes = response
346            .bytes()
347            .await
348            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
349
350        log::debug!(
351            "Vertex AI LLM response status={} body_len={}",
352            status,
353            bytes.len()
354        );
355
356        if status == StatusCode::TOO_MANY_REQUESTS {
357            return Ok(ChatOutcome::RateLimited);
358        }
359
360        if status.is_server_error() {
361            let body = String::from_utf8_lossy(&bytes);
362            log::error!("Vertex AI server error status={status} body={body}");
363            return Ok(ChatOutcome::ServerError(body.into_owned()));
364        }
365
366        if status.is_client_error() {
367            let body = String::from_utf8_lossy(&bytes);
368            log::warn!("Vertex AI client error status={status} body={body}");
369            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
370        }
371
372        let api_response: ApiGenerateContentResponse = serde_json::from_slice(&bytes)
373            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
374
375        let candidate = api_response
376            .candidates
377            .into_iter()
378            .next()
379            .ok_or_else(|| anyhow::anyhow!("no candidates in response"))?;
380
381        let content = build_content_blocks(&candidate.content);
382
383        if content.is_empty() && !candidate.content.parts.is_empty() {
384            log::warn!(
385                "Vertex AI parts not converted to content blocks raw_parts={:?}",
386                candidate.content.parts
387            );
388        }
389
390        let has_tool_calls = content
391            .iter()
392            .any(|b| matches!(b, agent_sdk_foundation::llm::ContentBlock::ToolUse { .. }));
393
394        let stop_reason = candidate
395            .finish_reason
396            .as_ref()
397            .map(|r| map_finish_reason(r, has_tool_calls));
398
399        let usage = api_response
400            .usage_metadata
401            .unwrap_or(ApiUsageMetadata {
402                prompt: 0,
403                candidates: 0,
404                cached_content: 0,
405            })
406            .into_usage();
407
408        Ok(ChatOutcome::Success(ChatResponse {
409            id: String::new(),
410            content,
411            model: self.model.clone(),
412            stop_reason,
413            usage,
414        }))
415    }
416
417    fn chat_stream_gemini(&self, request: ChatRequest) -> StreamBox<'_> {
418        Box::pin(async_stream::stream! {
419            let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
420                Ok(thinking) => thinking,
421                Err(error) => {
422                    yield Ok(StreamDelta::Error {
423                        message: error.to_string(),
424                        kind: StreamErrorKind::InvalidRequest,
425                    });
426                    return;
427                }
428            };
429            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
430                yield Ok(StreamDelta::Error {
431                    message: error.to_string(),
432                    kind: StreamErrorKind::InvalidRequest,
433                });
434                return;
435            }
436            let contents = build_api_contents(&request.messages);
437            let tools = request.tools.map(convert_tools_to_config);
438            let tool_config = request
439                .tool_choice
440                .as_ref()
441                .map(ApiFunctionCallingConfig::from_tool_choice);
442            let system_instruction = if request.system.is_empty() {
443                None
444            } else {
445                Some(ApiContent {
446                    role: None,
447                    parts: vec![ApiPart::Text {
448                        text: request.system.clone(),
449                        thought_signature: None,
450                    }],
451                })
452            };
453
454            let thinking_config = thinking.as_ref().map(map_thinking_config);
455            let (response_mime_type, response_schema) = request
456                .response_format
457                .as_ref()
458                .map_or((None, None), |rf| {
459                    (
460                        Some("application/json"),
461                        Some(gemini_response_schema(&rf.schema)),
462                    )
463                });
464
465            let api_request = ApiGenerateContentRequest {
466                contents: &contents,
467                system_instruction: system_instruction.as_ref(),
468                tools: tools.as_ref().map(std::slice::from_ref),
469                tool_config,
470                generation_config: Some(ApiGenerationConfig {
471                    max_output_tokens: Some(request.max_tokens),
472                    thinking_config,
473                    response_mime_type,
474                    response_schema,
475                }),
476                cached_content: request.cached_content.as_deref(),
477            };
478
479            log::debug!(
480                "Vertex AI streaming LLM request model={} max_tokens={}",
481                self.model,
482                request.max_tokens
483            );
484
485            let url = format!("{}:streamGenerateContent?alt=sse", self.base_url("google"));
486
487            let Ok(response) = self
488                .client
489                .post(&url)
490                .header("Content-Type", "application/json")
491                .bearer_auth(&self.access_token)
492                .json(&api_request)
493                .send()
494                .await
495            else {
496                yield Err(anyhow::anyhow!("request failed"));
497                return;
498            };
499
500            let status = response.status();
501            if !status.is_success() {
502                let body = response.text().await.unwrap_or_default();
503                let kind = if status == StatusCode::TOO_MANY_REQUESTS {
504                    StreamErrorKind::RateLimited
505                } else if status.is_server_error() {
506                    StreamErrorKind::ServerError
507                } else {
508                    StreamErrorKind::InvalidRequest
509                };
510                log::warn!("Vertex AI error status={status} body={body}");
511                yield Ok(StreamDelta::Error {
512                    message: body,
513                    kind,
514                });
515                return;
516            }
517
518            let mut inner = stream_gemini_response(response);
519            while let Some(item) = futures::StreamExt::next(&mut inner).await {
520                yield item;
521            }
522        })
523    }
524}
525
526// ============================================================================
527// Claude path (publishers/anthropic)
528// ============================================================================
529
530impl VertexProvider {
531    async fn chat_claude(&self, request: ChatRequest) -> Result<ChatOutcome> {
532        let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
533            Ok(thinking) => thinking,
534            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
535        };
536        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
537            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
538        }
539        let messages = Self::build_cached_vertex_claude_messages(&request);
540        let tools = anthropic_data::build_api_tools(&request);
541        let thinking = thinking_config
542            .as_ref()
543            .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
544        let output_config = thinking_config
545            .as_ref()
546            .and_then(|t| t.effort)
547            .map(|effort| anthropic_data::ApiOutputConfig { effort });
548        let system = Self::build_vertex_claude_system_prompt(&request.system);
549        let tool_choice = request
550            .tool_choice
551            .as_ref()
552            .map(anthropic_data::ApiToolChoice::from_tool_choice);
553
554        let api_request = anthropic_data::ApiMessagesRequest {
555            model: None, // model is in the URL for Vertex
556            max_tokens: request.max_tokens,
557            system,
558            messages: &messages,
559            tools: tools.as_deref(),
560            tool_choice,
561            stream: false,
562            thinking,
563            output_config,
564            anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
565        };
566
567        log::debug!(
568            "Vertex AI (Claude) LLM request model={} max_tokens={}",
569            self.model,
570            request.max_tokens
571        );
572
573        if log::log_enabled!(log::Level::Debug) {
574            match serde_json::to_string_pretty(&api_request) {
575                Ok(json) => log::debug!("Vertex AI (Claude) request payload:\n{json}"),
576                Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
577            }
578        }
579
580        let url = format!("{}:rawPredict", self.base_url("anthropic"));
581
582        let response = self
583            .client
584            .post(&url)
585            .header("Content-Type", "application/json")
586            .bearer_auth(&self.access_token)
587            .json(&api_request)
588            .send()
589            .await
590            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
591
592        let status = response.status();
593        let bytes = response
594            .bytes()
595            .await
596            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
597
598        log::debug!(
599            "Vertex AI (Claude) response status={} body_len={}",
600            status,
601            bytes.len()
602        );
603
604        if status == StatusCode::TOO_MANY_REQUESTS {
605            return Ok(ChatOutcome::RateLimited);
606        }
607
608        if status.is_server_error() {
609            let body = String::from_utf8_lossy(&bytes);
610            log::error!("Vertex AI (Claude) server error status={status} body={body}");
611            return Ok(ChatOutcome::ServerError(body.into_owned()));
612        }
613
614        if status.is_client_error() {
615            let body = String::from_utf8_lossy(&bytes);
616            log::warn!("Vertex AI (Claude) client error status={status} body={body}");
617            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
618        }
619
620        let api_response: anthropic_data::ApiResponse = serde_json::from_slice(&bytes)
621            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
622
623        log::debug!(
624            "Vertex AI (Claude) response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
625            api_response.id,
626            api_response.model,
627            api_response.stop_reason,
628            api_response.usage.total_input_tokens(),
629            api_response.usage.output,
630            api_response.content.len()
631        );
632
633        Ok(ChatOutcome::Success(Self::map_claude_response(
634            api_response,
635        )))
636    }
637
638    #[allow(clippy::too_many_lines)]
639    fn chat_stream_claude(&self, request: ChatRequest) -> StreamBox<'_> {
640        Box::pin(async_stream::stream! {
641            let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
642                Ok(thinking) => thinking,
643                Err(error) => {
644                    yield Ok(StreamDelta::Error {
645                        message: error.to_string(),
646                        kind: StreamErrorKind::InvalidRequest,
647                    });
648                    return;
649                }
650            };
651            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
652                yield Ok(StreamDelta::Error {
653                    message: error.to_string(),
654                    kind: StreamErrorKind::InvalidRequest,
655                });
656                return;
657            }
658            let messages = Self::build_cached_vertex_claude_messages(&request);
659            let tools = anthropic_data::build_api_tools(&request);
660            let thinking = thinking_config
661                .as_ref()
662                .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
663            let output_config = thinking_config
664                .as_ref()
665                .and_then(|t| t.effort)
666                .map(|effort| anthropic_data::ApiOutputConfig { effort });
667            let system = Self::build_vertex_claude_system_prompt(&request.system);
668            let tool_choice = request
669                .tool_choice
670                .as_ref()
671                .map(anthropic_data::ApiToolChoice::from_tool_choice);
672
673            let api_request = anthropic_data::ApiMessagesRequest {
674                model: None, // model is in the URL for Vertex
675                max_tokens: request.max_tokens,
676                system,
677                messages: &messages,
678                tools: tools.as_deref(),
679                tool_choice,
680                stream: true,
681                thinking,
682                output_config,
683                anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
684            };
685
686            log::debug!(
687                "Vertex AI (Claude) streaming request model={} max_tokens={}",
688                self.model,
689                request.max_tokens
690            );
691
692            if log::log_enabled!(log::Level::Debug) {
693                match serde_json::to_string_pretty(&api_request) {
694                    Ok(json) => log::debug!("Vertex AI (Claude) streaming request payload:\n{json}"),
695                    Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
696                }
697            }
698
699            let url = format!("{}:streamRawPredict", self.base_url("anthropic"));
700
701            let response = match self
702                .client
703                .post(&url)
704                .header("Content-Type", "application/json")
705                .bearer_auth(&self.access_token)
706                .json(&api_request)
707                .send()
708                .await
709            {
710                Ok(r) => r,
711                Err(e) => {
712                    yield Err(anyhow::anyhow!("request failed: {e}"));
713                    return;
714                }
715            };
716
717            let status = response.status();
718
719            if status == StatusCode::TOO_MANY_REQUESTS {
720                yield Ok(StreamDelta::Error {
721                    message: "Rate limited".to_string(),
722                    kind: StreamErrorKind::RateLimited,
723                });
724                return;
725            }
726
727            if status.is_server_error() {
728                let body = response.text().await.unwrap_or_default();
729                log::error!("Vertex AI (Claude) server error status={status} body={body}");
730                yield Ok(StreamDelta::Error {
731                    message: body,
732                    kind: StreamErrorKind::ServerError,
733                });
734                return;
735            }
736
737            if status.is_client_error() {
738                let body = response.text().await.unwrap_or_default();
739                log::warn!("Vertex AI (Claude) client error status={status} body={body}");
740                yield Ok(StreamDelta::Error {
741                    message: body,
742                    kind: StreamErrorKind::InvalidRequest,
743                });
744                return;
745            }
746
747            // Process SSE stream using the Anthropic SSE parser
748            let mut stream = response.bytes_stream();
749            let mut buffer = String::new();
750            let mut input_tokens: u32 = 0;
751            let mut output_tokens: u32 = 0;
752            let mut cached_input_tokens: u32 = 0;
753            let mut cache_creation_input_tokens: u32 = 0;
754            let mut tool_ids: std::collections::HashMap<usize, String> =
755                std::collections::HashMap::new();
756            let mut received_message_stop = false;
757            let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
758
759            while let Some(chunk_result) = stream.next().await {
760                let Ok(chunk) = chunk_result else {
761                    yield Err(anyhow::anyhow!("stream error"));
762                    return;
763                };
764
765                buffer.push_str(&String::from_utf8_lossy(&chunk));
766
767                // Process complete SSE events (terminated by a blank line)
768                while let Some(event_block) = anthropic_data::take_next_sse_event(&mut buffer) {
769                    if anthropic_data::is_message_stop_event(&event_block) {
770                        received_message_stop = true;
771                    }
772
773                    if let Some(delta) = anthropic_data::parse_sse_event(
774                        &event_block,
775                        &mut input_tokens,
776                        &mut output_tokens,
777                        &mut cached_input_tokens,
778                        &mut cache_creation_input_tokens,
779                        &mut tool_ids,
780                        &mut pending_stop_reason,
781                    ) {
782                        yield Ok(delta);
783                    }
784                    if anthropic_data::is_message_stop_event(&event_block) {
785                        yield Ok(StreamDelta::Done {
786                            stop_reason: pending_stop_reason.take(),
787                        });
788                    }
789                }
790            }
791
792            // Process remaining buffer
793            let remaining = buffer.trim();
794            if !remaining.is_empty() {
795                if anthropic_data::is_message_stop_event(remaining) {
796                    received_message_stop = true;
797                }
798
799                if let Some(delta) = anthropic_data::parse_sse_event(
800                    remaining,
801                    &mut input_tokens,
802                    &mut output_tokens,
803                    &mut cached_input_tokens,
804                    &mut cache_creation_input_tokens,
805                    &mut tool_ids,
806                    &mut pending_stop_reason,
807                ) {
808                    yield Ok(delta);
809                }
810                if anthropic_data::is_message_stop_event(remaining) {
811                    yield Ok(StreamDelta::Done {
812                        stop_reason: pending_stop_reason.take(),
813                    });
814                }
815            }
816
817            if !received_message_stop {
818                log::warn!(
819                    "Vertex AI (Claude) SSE stream ended without message_stop"
820                );
821                yield Ok(StreamDelta::Error {
822                    message: "Stream ended unexpectedly without completion".to_string(),
823                    kind: StreamErrorKind::ServerError,
824                });
825            }
826        })
827    }
828}
829
830#[cfg(test)]
831mod tests {
832    use super::*;
833
834    #[test]
835    fn test_new_creates_provider() {
836        let provider = VertexProvider::new(
837            "token".to_string(),
838            "my-project".to_string(),
839            "us-central1".to_string(),
840            "custom-model".to_string(),
841        );
842
843        assert_eq!(provider.model(), "custom-model");
844        assert_eq!(provider.provider(), "vertex");
845    }
846
847    #[test]
848    fn test_flash_factory() {
849        let provider = VertexProvider::flash(
850            "token".to_string(),
851            "my-project".to_string(),
852            "us-central1".to_string(),
853        );
854
855        assert_eq!(provider.model(), MODEL_GEMINI_3_FLASH);
856        assert_eq!(provider.provider(), "vertex");
857    }
858
859    #[test]
860    fn test_pro_factory() {
861        let provider = VertexProvider::pro(
862            "token".to_string(),
863            "my-project".to_string(),
864            "us-central1".to_string(),
865        );
866
867        assert_eq!(provider.model(), MODEL_GEMINI_31_PRO);
868        assert_eq!(provider.provider(), "vertex");
869    }
870
871    #[test]
872    fn test_provider_is_cloneable() {
873        let provider = VertexProvider::new(
874            "token".to_string(),
875            "my-project".to_string(),
876            "us-central1".to_string(),
877            "test-model".to_string(),
878        );
879        let cloned = provider.clone();
880
881        assert_eq!(provider.model(), cloned.model());
882        assert_eq!(provider.provider(), cloned.provider());
883    }
884
885    #[test]
886    fn test_is_claude_model() {
887        let claude_provider = VertexProvider::new(
888            "token".to_string(),
889            "project".to_string(),
890            "us-central1".to_string(),
891            "claude-sonnet-4-20250514".to_string(),
892        );
893        assert!(claude_provider.is_claude_model());
894
895        let gemini_provider = VertexProvider::new(
896            "token".to_string(),
897            "project".to_string(),
898            "us-central1".to_string(),
899            "gemini-3-flash-preview".to_string(),
900        );
901        assert!(!gemini_provider.is_claude_model());
902    }
903
904    #[test]
905    fn test_base_url_gemini() {
906        let provider = VertexProvider::new(
907            "token".to_string(),
908            "my-project".to_string(),
909            "us-central1".to_string(),
910            "gemini-3-flash-preview".to_string(),
911        );
912
913        let url = provider.base_url("google");
914        assert_eq!(
915            url,
916            "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google/models/gemini-3-flash-preview"
917        );
918    }
919
920    #[test]
921    fn test_base_url_claude() {
922        let provider = VertexProvider::new(
923            "token".to_string(),
924            "my-project".to_string(),
925            "us-central1".to_string(),
926            "claude-sonnet-4-20250514".to_string(),
927        );
928
929        let url = provider.base_url("anthropic");
930        assert_eq!(
931            url,
932            "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/anthropic/models/claude-sonnet-4-20250514"
933        );
934    }
935
936    #[test]
937    fn test_base_url_with_different_region() {
938        let provider = VertexProvider::new(
939            "token".to_string(),
940            "other-project".to_string(),
941            "europe-west4".to_string(),
942            "gemini-3.1-pro-preview".to_string(),
943        );
944
945        let url = provider.base_url("google");
946        assert!(url.starts_with("https://europe-west4-aiplatform.googleapis.com/"));
947        assert!(url.contains("/projects/other-project/"));
948        assert!(url.contains("/locations/europe-west4/"));
949        assert!(url.ends_with("/models/gemini-3.1-pro-preview"));
950    }
951
952    #[test]
953    fn test_base_url_global_region_has_no_prefix() {
954        let provider = VertexProvider::new(
955            "token".to_string(),
956            "my-project".to_string(),
957            "global".to_string(),
958            "gemini-3.1-pro-preview".to_string(),
959        );
960
961        let url = provider.base_url("google");
962        assert_eq!(
963            url,
964            "https://aiplatform.googleapis.com/v1/projects/my-project/locations/global/publishers/google/models/gemini-3.1-pro-preview"
965        );
966    }
967
968    #[test]
969    fn test_vertex_claude_46_rejects_budgeted_thinking() {
970        let provider = VertexProvider::new(
971            "token".to_string(),
972            "project".to_string(),
973            "global".to_string(),
974            MODEL_SONNET_46.to_string(),
975        );
976
977        let error = provider
978            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
979            .unwrap_err();
980        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
981    }
982
983    #[test]
984    fn test_vertex_claude_opus_47_rejects_budgeted_thinking() {
985        let provider = VertexProvider::new(
986            "token".to_string(),
987            "project".to_string(),
988            "global".to_string(),
989            MODEL_OPUS_47.to_string(),
990        );
991
992        let error = provider
993            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
994            .unwrap_err();
995        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
996    }
997
998    #[test]
999    fn test_vertex_claude_opus_48_rejects_budgeted_thinking() {
1000        let provider = VertexProvider::new(
1001            "token".to_string(),
1002            "project".to_string(),
1003            "global".to_string(),
1004            MODEL_OPUS_48.to_string(),
1005        );
1006
1007        let error = provider
1008            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1009            .unwrap_err();
1010        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1011    }
1012
1013    #[test]
1014    fn test_vertex_claude_fable_5_rejects_budgeted_thinking() {
1015        let provider = VertexProvider::new(
1016            "token".to_string(),
1017            "project".to_string(),
1018            "global".to_string(),
1019            MODEL_FABLE_5.to_string(),
1020        );
1021
1022        let error = provider
1023            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
1024            .unwrap_err();
1025        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
1026    }
1027
1028    #[test]
1029    fn test_model_constants() {
1030        assert_eq!(MODEL_GEMINI_3_FLASH, "gemini-3-flash-preview");
1031        assert_eq!(MODEL_GEMINI_31_PRO, "gemini-3.1-pro-preview");
1032        assert_eq!(MODEL_GEMINI_3_PRO, "gemini-3.0-pro");
1033    }
1034}