Skip to main content

agent_sdk_providers/impls/
vertex.rs

1//! Google Vertex AI provider implementation.
2//!
3//! This module provides an implementation of `LlmProvider` for the Google Vertex AI
4//! platform. It supports both Gemini models (using the Gemini API format) and
5//! Claude models (using the Anthropic Messages API format via `rawPredict`).
6//!
7//! Publisher detection is automatic based on the model name:
8//! - `claude-*` models route to `publishers/anthropic` using `rawPredict`
9//! - All other models route to `publishers/google` using `generateContent`
10
11use crate::attachments::validate_request_attachments;
12use crate::impls::anthropic::{MODEL_OPUS_46, MODEL_SONNET_46, data as anthropic_data};
13use crate::impls::gemini::data::{
14    ApiContent, ApiFunctionCallingConfig, ApiGenerateContentRequest, ApiGenerateContentResponse,
15    ApiGenerationConfig, ApiPart, ApiUsageMetadata, build_api_contents, build_content_blocks,
16    convert_tools_to_config, gemini_response_schema, map_finish_reason, map_thinking_config,
17    stream_gemini_response,
18};
19use crate::provider::LlmProvider;
20use crate::streaming::{StreamBox, StreamDelta, StreamErrorKind};
21use agent_sdk_foundation::llm::{
22    ChatOutcome, ChatRequest, ChatResponse, ThinkingConfig, ThinkingMode, Usage,
23};
24use anyhow::Result;
25use async_trait::async_trait;
26use futures::StreamExt;
27use reqwest::StatusCode;
28
29pub const MODEL_GEMINI_3_FLASH: &str = "gemini-3-flash-preview";
30pub const MODEL_GEMINI_31_PRO: &str = "gemini-3.1-pro-preview";
31
32// Legacy Gemini 3.0 Pro model kept for explicit opt-in.
33pub const MODEL_GEMINI_3_PRO: &str = "gemini-3.0-pro";
34
35/// The Anthropic API version used for Claude models on Vertex AI.
36const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
37const DEFAULT_SAFE_MAX_OUTPUT_TOKENS: u32 = 32_000;
38
39const fn vertex_cache_control() -> anthropic_data::ApiCacheControl {
40    anthropic_data::ApiCacheControl::ephemeral()
41}
42
43/// Google Vertex AI LLM provider.
44///
45/// Uses the same Gemini request/response format as `GeminiProvider` but
46/// authenticates via `OAuth2` Bearer tokens and routes through the Vertex AI
47/// regional endpoint.
48///
49/// Claude models are also supported — the provider detects the publisher from
50/// the model name and uses the appropriate API format automatically.
51#[derive(Clone)]
52pub struct VertexProvider {
53    client: reqwest::Client,
54    access_token: String,
55    project_id: String,
56    region: String,
57    model: String,
58    thinking: Option<ThinkingConfig>,
59}
60
61impl VertexProvider {
62    /// Create a new Vertex AI provider with full control over all parameters.
63    #[must_use]
64    pub fn new(access_token: String, project_id: String, region: String, model: String) -> Self {
65        let client = reqwest::Client::builder()
66            .connect_timeout(std::time::Duration::from_secs(30))
67            .tcp_keepalive(std::time::Duration::from_secs(30))
68            .build()
69            .unwrap_or_default();
70
71        Self {
72            client,
73            access_token,
74            project_id,
75            region,
76            model,
77            thinking: None,
78        }
79    }
80
81    /// Create a provider using Gemini 3 Flash Preview on Vertex AI.
82    #[must_use]
83    pub fn flash(access_token: String, project_id: String, region: String) -> Self {
84        Self::new(
85            access_token,
86            project_id,
87            region,
88            MODEL_GEMINI_3_FLASH.to_owned(),
89        )
90    }
91
92    /// Create a provider using Gemini 3.1 Pro Preview on Vertex AI.
93    #[must_use]
94    pub fn pro(access_token: String, project_id: String, region: String) -> Self {
95        Self::new(
96            access_token,
97            project_id,
98            region,
99            MODEL_GEMINI_31_PRO.to_owned(),
100        )
101    }
102
103    /// Detect whether the model is a Claude model (Anthropic publisher).
104    fn is_claude_model(&self) -> bool {
105        self.model.starts_with("claude-")
106    }
107
108    /// Build the base URL for the given publisher and model.
109    ///
110    /// For the `global` location the domain is `aiplatform.googleapis.com`
111    /// (no region prefix). Regional locations use `{region}-aiplatform.googleapis.com`.
112    fn base_url(&self, publisher: &str) -> String {
113        let domain = if self.region == "global" {
114            "aiplatform.googleapis.com".to_owned()
115        } else {
116            format!("{}-aiplatform.googleapis.com", self.region)
117        };
118        format!(
119            "https://{domain}/v1/projects/{project}/locations/{region}/publishers/{publisher}/models/{model}",
120            domain = domain,
121            region = self.region,
122            project = self.project_id,
123            publisher = publisher,
124            model = self.model,
125        )
126    }
127
128    /// Set the provider-owned thinking configuration for this model.
129    #[must_use]
130    pub const fn with_thinking(mut self, thinking: ThinkingConfig) -> Self {
131        self.thinking = Some(thinking);
132        self
133    }
134
135    fn requires_anthropic_adaptive_thinking(&self) -> bool {
136        matches!(self.model.as_str(), MODEL_SONNET_46 | MODEL_OPUS_46)
137    }
138
139    fn build_cached_vertex_claude_messages(
140        request: &ChatRequest,
141    ) -> Vec<anthropic_data::ApiMessage> {
142        let mut messages = anthropic_data::build_api_messages(request);
143        anthropic_data::apply_cache_control_to_last_user_message(
144            &mut messages,
145            vertex_cache_control(),
146        );
147        messages
148    }
149
150    fn build_vertex_claude_system_prompt(
151        system: &str,
152    ) -> Option<anthropic_data::ApiSystemPrompt<'_>> {
153        anthropic_data::build_api_system_prompt(system, Some(vertex_cache_control()))
154    }
155
156    fn map_claude_response(api_response: anthropic_data::ApiResponse) -> ChatResponse {
157        let content = anthropic_data::map_content_blocks(api_response.content);
158        let stop_reason = api_response
159            .stop_reason
160            .as_ref()
161            .map(anthropic_data::map_stop_reason);
162
163        ChatResponse {
164            id: api_response.id,
165            content,
166            model: api_response.model,
167            stop_reason,
168            usage: Usage {
169                input_tokens: api_response.usage.total_input_tokens(),
170                output_tokens: api_response.usage.output,
171                cached_input_tokens: api_response.usage.cached_input_tokens(),
172                cache_creation_input_tokens: api_response.usage.cache_creation_input_tokens(),
173            },
174        }
175    }
176}
177
178#[async_trait]
179impl LlmProvider for VertexProvider {
180    async fn chat(&self, request: ChatRequest) -> Result<ChatOutcome> {
181        if self.is_claude_model() {
182            return self.chat_claude(request).await;
183        }
184        self.chat_gemini(request).await
185    }
186
187    fn chat_stream(&self, request: ChatRequest) -> StreamBox<'_> {
188        if self.is_claude_model() {
189            return self.chat_stream_claude(request);
190        }
191        self.chat_stream_gemini(request)
192    }
193
194    fn validate_thinking_config(&self, thinking: Option<&ThinkingConfig>) -> Result<()> {
195        let Some(thinking) = thinking else {
196            return Ok(());
197        };
198
199        if self
200            .capabilities()
201            .is_some_and(|caps| !caps.supports_thinking)
202        {
203            return Err(anyhow::anyhow!(
204                "thinking is not supported for provider={} model={}",
205                self.provider(),
206                self.model()
207            ));
208        }
209
210        if matches!(thinking.mode, ThinkingMode::Adaptive)
211            && !self
212                .capabilities()
213                .is_some_and(|caps| caps.supports_adaptive_thinking)
214        {
215            return Err(anyhow::anyhow!(
216                "adaptive thinking is not supported for provider={} model={}",
217                self.provider(),
218                self.model()
219            ));
220        }
221
222        if self.is_claude_model()
223            && self.requires_anthropic_adaptive_thinking()
224            && matches!(thinking.mode, ThinkingMode::Enabled { .. })
225        {
226            return Err(anyhow::anyhow!(
227                "budget_tokens thinking is deprecated for provider={} model={}; use ThinkingConfig::adaptive() instead",
228                self.provider(),
229                self.model()
230            ));
231        }
232
233        Ok(())
234    }
235
236    fn model(&self) -> &str {
237        &self.model
238    }
239
240    fn provider(&self) -> &'static str {
241        "vertex"
242    }
243
244    fn configured_thinking(&self) -> Option<&ThinkingConfig> {
245        self.thinking.as_ref()
246    }
247
248    fn default_max_tokens(&self) -> u32 {
249        let provider = if self.is_claude_model() {
250            "anthropic"
251        } else {
252            "gemini"
253        };
254        let model_max = self
255            .capabilities()
256            .and_then(|caps| caps.max_output_tokens)
257            .or_else(|| {
258                crate::model_capabilities::default_max_output_tokens(provider, self.model())
259            })
260            .unwrap_or(4096);
261        model_max.clamp(4096, DEFAULT_SAFE_MAX_OUTPUT_TOKENS)
262    }
263}
264
265// ============================================================================
266// Gemini path (publishers/google)
267// ============================================================================
268
269impl VertexProvider {
270    #[allow(clippy::too_many_lines)]
271    async fn chat_gemini(&self, request: ChatRequest) -> Result<ChatOutcome> {
272        let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
273            Ok(thinking) => thinking,
274            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
275        };
276        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
277            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
278        }
279        let contents = build_api_contents(&request.messages);
280        let tools = request.tools.map(convert_tools_to_config);
281        let tool_config = request
282            .tool_choice
283            .as_ref()
284            .map(ApiFunctionCallingConfig::from_tool_choice);
285        let system_instruction = if request.system.is_empty() {
286            None
287        } else {
288            Some(ApiContent {
289                role: None,
290                parts: vec![ApiPart::Text {
291                    text: request.system.clone(),
292                    thought_signature: None,
293                }],
294            })
295        };
296
297        let thinking_config = thinking.as_ref().map(map_thinking_config);
298        let (response_mime_type, response_schema) =
299            request.response_format.as_ref().map_or((None, None), |rf| {
300                (
301                    Some("application/json"),
302                    Some(gemini_response_schema(&rf.schema)),
303                )
304            });
305
306        let api_request = ApiGenerateContentRequest {
307            contents: &contents,
308            system_instruction: system_instruction.as_ref(),
309            tools: tools.as_ref().map(std::slice::from_ref),
310            tool_config,
311            generation_config: Some(ApiGenerationConfig {
312                max_output_tokens: Some(request.max_tokens),
313                thinking_config,
314                response_mime_type,
315                response_schema,
316            }),
317            cached_content: request.cached_content.as_deref(),
318        };
319
320        log::debug!(
321            "Vertex AI LLM request model={} max_tokens={}",
322            self.model,
323            request.max_tokens
324        );
325
326        let url = format!("{}:generateContent", self.base_url("google"));
327
328        let response = self
329            .client
330            .post(&url)
331            .header("Content-Type", "application/json")
332            .bearer_auth(&self.access_token)
333            .json(&api_request)
334            .send()
335            .await
336            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
337
338        let status = response.status();
339        let bytes = response
340            .bytes()
341            .await
342            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
343
344        log::debug!(
345            "Vertex AI LLM response status={} body_len={}",
346            status,
347            bytes.len()
348        );
349
350        if status == StatusCode::TOO_MANY_REQUESTS {
351            return Ok(ChatOutcome::RateLimited);
352        }
353
354        if status.is_server_error() {
355            let body = String::from_utf8_lossy(&bytes);
356            log::error!("Vertex AI server error status={status} body={body}");
357            return Ok(ChatOutcome::ServerError(body.into_owned()));
358        }
359
360        if status.is_client_error() {
361            let body = String::from_utf8_lossy(&bytes);
362            log::warn!("Vertex AI client error status={status} body={body}");
363            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
364        }
365
366        let api_response: ApiGenerateContentResponse = serde_json::from_slice(&bytes)
367            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
368
369        let candidate = api_response
370            .candidates
371            .into_iter()
372            .next()
373            .ok_or_else(|| anyhow::anyhow!("no candidates in response"))?;
374
375        let content = build_content_blocks(&candidate.content);
376
377        if content.is_empty() && !candidate.content.parts.is_empty() {
378            log::warn!(
379                "Vertex AI parts not converted to content blocks raw_parts={:?}",
380                candidate.content.parts
381            );
382        }
383
384        let has_tool_calls = content
385            .iter()
386            .any(|b| matches!(b, agent_sdk_foundation::llm::ContentBlock::ToolUse { .. }));
387
388        let stop_reason = candidate
389            .finish_reason
390            .as_ref()
391            .map(|r| map_finish_reason(r, has_tool_calls));
392
393        let usage = api_response
394            .usage_metadata
395            .unwrap_or(ApiUsageMetadata {
396                prompt: 0,
397                candidates: 0,
398                cached_content: 0,
399            })
400            .into_usage();
401
402        Ok(ChatOutcome::Success(ChatResponse {
403            id: String::new(),
404            content,
405            model: self.model.clone(),
406            stop_reason,
407            usage,
408        }))
409    }
410
411    fn chat_stream_gemini(&self, request: ChatRequest) -> StreamBox<'_> {
412        Box::pin(async_stream::stream! {
413            let thinking = match self.resolve_thinking_config(request.thinking.as_ref()) {
414                Ok(thinking) => thinking,
415                Err(error) => {
416                    yield Ok(StreamDelta::Error {
417                        message: error.to_string(),
418                        kind: StreamErrorKind::InvalidRequest,
419                    });
420                    return;
421                }
422            };
423            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
424                yield Ok(StreamDelta::Error {
425                    message: error.to_string(),
426                    kind: StreamErrorKind::InvalidRequest,
427                });
428                return;
429            }
430            let contents = build_api_contents(&request.messages);
431            let tools = request.tools.map(convert_tools_to_config);
432            let tool_config = request
433                .tool_choice
434                .as_ref()
435                .map(ApiFunctionCallingConfig::from_tool_choice);
436            let system_instruction = if request.system.is_empty() {
437                None
438            } else {
439                Some(ApiContent {
440                    role: None,
441                    parts: vec![ApiPart::Text {
442                        text: request.system.clone(),
443                        thought_signature: None,
444                    }],
445                })
446            };
447
448            let thinking_config = thinking.as_ref().map(map_thinking_config);
449            let (response_mime_type, response_schema) = request
450                .response_format
451                .as_ref()
452                .map_or((None, None), |rf| {
453                    (
454                        Some("application/json"),
455                        Some(gemini_response_schema(&rf.schema)),
456                    )
457                });
458
459            let api_request = ApiGenerateContentRequest {
460                contents: &contents,
461                system_instruction: system_instruction.as_ref(),
462                tools: tools.as_ref().map(std::slice::from_ref),
463                tool_config,
464                generation_config: Some(ApiGenerationConfig {
465                    max_output_tokens: Some(request.max_tokens),
466                    thinking_config,
467                    response_mime_type,
468                    response_schema,
469                }),
470                cached_content: request.cached_content.as_deref(),
471            };
472
473            log::debug!(
474                "Vertex AI streaming LLM request model={} max_tokens={}",
475                self.model,
476                request.max_tokens
477            );
478
479            let url = format!("{}:streamGenerateContent?alt=sse", self.base_url("google"));
480
481            let Ok(response) = self
482                .client
483                .post(&url)
484                .header("Content-Type", "application/json")
485                .bearer_auth(&self.access_token)
486                .json(&api_request)
487                .send()
488                .await
489            else {
490                yield Err(anyhow::anyhow!("request failed"));
491                return;
492            };
493
494            let status = response.status();
495            if !status.is_success() {
496                let body = response.text().await.unwrap_or_default();
497                let kind = if status == StatusCode::TOO_MANY_REQUESTS {
498                    StreamErrorKind::RateLimited
499                } else if status.is_server_error() {
500                    StreamErrorKind::ServerError
501                } else {
502                    StreamErrorKind::InvalidRequest
503                };
504                log::warn!("Vertex AI error status={status} body={body}");
505                yield Ok(StreamDelta::Error {
506                    message: body,
507                    kind,
508                });
509                return;
510            }
511
512            let mut inner = stream_gemini_response(response);
513            while let Some(item) = futures::StreamExt::next(&mut inner).await {
514                yield item;
515            }
516        })
517    }
518}
519
520// ============================================================================
521// Claude path (publishers/anthropic)
522// ============================================================================
523
524impl VertexProvider {
525    async fn chat_claude(&self, request: ChatRequest) -> Result<ChatOutcome> {
526        let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
527            Ok(thinking) => thinking,
528            Err(error) => return Ok(ChatOutcome::InvalidRequest(error.to_string())),
529        };
530        if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
531            return Ok(ChatOutcome::InvalidRequest(error.to_string()));
532        }
533        let messages = Self::build_cached_vertex_claude_messages(&request);
534        let tools = anthropic_data::build_api_tools(&request);
535        let thinking = thinking_config
536            .as_ref()
537            .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
538        let output_config = thinking_config
539            .as_ref()
540            .and_then(|t| t.effort)
541            .map(|effort| anthropic_data::ApiOutputConfig { effort });
542        let system = Self::build_vertex_claude_system_prompt(&request.system);
543        let tool_choice = request
544            .tool_choice
545            .as_ref()
546            .map(anthropic_data::ApiToolChoice::from_tool_choice);
547
548        let api_request = anthropic_data::ApiMessagesRequest {
549            model: None, // model is in the URL for Vertex
550            max_tokens: request.max_tokens,
551            system,
552            messages: &messages,
553            tools: tools.as_deref(),
554            tool_choice,
555            stream: false,
556            thinking,
557            output_config,
558            anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
559        };
560
561        log::debug!(
562            "Vertex AI (Claude) LLM request model={} max_tokens={}",
563            self.model,
564            request.max_tokens
565        );
566
567        if log::log_enabled!(log::Level::Debug) {
568            match serde_json::to_string_pretty(&api_request) {
569                Ok(json) => log::debug!("Vertex AI (Claude) request payload:\n{json}"),
570                Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
571            }
572        }
573
574        let url = format!("{}:rawPredict", self.base_url("anthropic"));
575
576        let response = self
577            .client
578            .post(&url)
579            .header("Content-Type", "application/json")
580            .bearer_auth(&self.access_token)
581            .json(&api_request)
582            .send()
583            .await
584            .map_err(|e| anyhow::anyhow!("request failed: {e}"))?;
585
586        let status = response.status();
587        let bytes = response
588            .bytes()
589            .await
590            .map_err(|e| anyhow::anyhow!("failed to read response body: {e}"))?;
591
592        log::debug!(
593            "Vertex AI (Claude) response status={} body_len={}",
594            status,
595            bytes.len()
596        );
597
598        if status == StatusCode::TOO_MANY_REQUESTS {
599            return Ok(ChatOutcome::RateLimited);
600        }
601
602        if status.is_server_error() {
603            let body = String::from_utf8_lossy(&bytes);
604            log::error!("Vertex AI (Claude) server error status={status} body={body}");
605            return Ok(ChatOutcome::ServerError(body.into_owned()));
606        }
607
608        if status.is_client_error() {
609            let body = String::from_utf8_lossy(&bytes);
610            log::warn!("Vertex AI (Claude) client error status={status} body={body}");
611            return Ok(ChatOutcome::InvalidRequest(body.into_owned()));
612        }
613
614        let api_response: anthropic_data::ApiResponse = serde_json::from_slice(&bytes)
615            .map_err(|e| anyhow::anyhow!("failed to parse response: {e}"))?;
616
617        log::debug!(
618            "Vertex AI (Claude) response: id={} model={} stop_reason={:?} usage={{input_tokens={}, output_tokens={}}} content_blocks={}",
619            api_response.id,
620            api_response.model,
621            api_response.stop_reason,
622            api_response.usage.total_input_tokens(),
623            api_response.usage.output,
624            api_response.content.len()
625        );
626
627        Ok(ChatOutcome::Success(Self::map_claude_response(
628            api_response,
629        )))
630    }
631
632    #[allow(clippy::too_many_lines)]
633    fn chat_stream_claude(&self, request: ChatRequest) -> StreamBox<'_> {
634        Box::pin(async_stream::stream! {
635            let thinking_config = match self.resolve_thinking_config(request.thinking.as_ref()) {
636                Ok(thinking) => thinking,
637                Err(error) => {
638                    yield Ok(StreamDelta::Error {
639                        message: error.to_string(),
640                        kind: StreamErrorKind::InvalidRequest,
641                    });
642                    return;
643                }
644            };
645            if let Err(error) = validate_request_attachments(self.provider(), self.model(), &request) {
646                yield Ok(StreamDelta::Error {
647                    message: error.to_string(),
648                    kind: StreamErrorKind::InvalidRequest,
649                });
650                return;
651            }
652            let messages = Self::build_cached_vertex_claude_messages(&request);
653            let tools = anthropic_data::build_api_tools(&request);
654            let thinking = thinking_config
655                .as_ref()
656                .map(anthropic_data::ApiThinkingConfig::from_thinking_config);
657            let output_config = thinking_config
658                .as_ref()
659                .and_then(|t| t.effort)
660                .map(|effort| anthropic_data::ApiOutputConfig { effort });
661            let system = Self::build_vertex_claude_system_prompt(&request.system);
662            let tool_choice = request
663                .tool_choice
664                .as_ref()
665                .map(anthropic_data::ApiToolChoice::from_tool_choice);
666
667            let api_request = anthropic_data::ApiMessagesRequest {
668                model: None, // model is in the URL for Vertex
669                max_tokens: request.max_tokens,
670                system,
671                messages: &messages,
672                tools: tools.as_deref(),
673                tool_choice,
674                stream: true,
675                thinking,
676                output_config,
677                anthropic_version: Some(VERTEX_ANTHROPIC_VERSION),
678            };
679
680            log::debug!(
681                "Vertex AI (Claude) streaming request model={} max_tokens={}",
682                self.model,
683                request.max_tokens
684            );
685
686            if log::log_enabled!(log::Level::Debug) {
687                match serde_json::to_string_pretty(&api_request) {
688                    Ok(json) => log::debug!("Vertex AI (Claude) streaming request payload:\n{json}"),
689                    Err(e) => log::debug!("Failed to serialize request for logging: {e}"),
690                }
691            }
692
693            let url = format!("{}:streamRawPredict", self.base_url("anthropic"));
694
695            let response = match self
696                .client
697                .post(&url)
698                .header("Content-Type", "application/json")
699                .bearer_auth(&self.access_token)
700                .json(&api_request)
701                .send()
702                .await
703            {
704                Ok(r) => r,
705                Err(e) => {
706                    yield Err(anyhow::anyhow!("request failed: {e}"));
707                    return;
708                }
709            };
710
711            let status = response.status();
712
713            if status == StatusCode::TOO_MANY_REQUESTS {
714                yield Ok(StreamDelta::Error {
715                    message: "Rate limited".to_string(),
716                    kind: StreamErrorKind::RateLimited,
717                });
718                return;
719            }
720
721            if status.is_server_error() {
722                let body = response.text().await.unwrap_or_default();
723                log::error!("Vertex AI (Claude) server error status={status} body={body}");
724                yield Ok(StreamDelta::Error {
725                    message: body,
726                    kind: StreamErrorKind::ServerError,
727                });
728                return;
729            }
730
731            if status.is_client_error() {
732                let body = response.text().await.unwrap_or_default();
733                log::warn!("Vertex AI (Claude) client error status={status} body={body}");
734                yield Ok(StreamDelta::Error {
735                    message: body,
736                    kind: StreamErrorKind::InvalidRequest,
737                });
738                return;
739            }
740
741            // Process SSE stream using the Anthropic SSE parser
742            let mut stream = response.bytes_stream();
743            let mut buffer = String::new();
744            let mut input_tokens: u32 = 0;
745            let mut output_tokens: u32 = 0;
746            let mut cached_input_tokens: u32 = 0;
747            let mut cache_creation_input_tokens: u32 = 0;
748            let mut tool_ids: std::collections::HashMap<usize, String> =
749                std::collections::HashMap::new();
750            let mut received_message_stop = false;
751            let mut pending_stop_reason: Option<agent_sdk_foundation::llm::StopReason> = None;
752
753            while let Some(chunk_result) = stream.next().await {
754                let Ok(chunk) = chunk_result else {
755                    yield Err(anyhow::anyhow!("stream error"));
756                    return;
757                };
758
759                buffer.push_str(&String::from_utf8_lossy(&chunk));
760
761                // Process complete SSE events (terminated by a blank line)
762                while let Some(event_block) = anthropic_data::take_next_sse_event(&mut buffer) {
763                    if anthropic_data::is_message_stop_event(&event_block) {
764                        received_message_stop = true;
765                    }
766
767                    if let Some(delta) = anthropic_data::parse_sse_event(
768                        &event_block,
769                        &mut input_tokens,
770                        &mut output_tokens,
771                        &mut cached_input_tokens,
772                        &mut cache_creation_input_tokens,
773                        &mut tool_ids,
774                        &mut pending_stop_reason,
775                    ) {
776                        yield Ok(delta);
777                    }
778                    if anthropic_data::is_message_stop_event(&event_block) {
779                        yield Ok(StreamDelta::Done {
780                            stop_reason: pending_stop_reason.take(),
781                        });
782                    }
783                }
784            }
785
786            // Process remaining buffer
787            let remaining = buffer.trim();
788            if !remaining.is_empty() {
789                if anthropic_data::is_message_stop_event(remaining) {
790                    received_message_stop = true;
791                }
792
793                if let Some(delta) = anthropic_data::parse_sse_event(
794                    remaining,
795                    &mut input_tokens,
796                    &mut output_tokens,
797                    &mut cached_input_tokens,
798                    &mut cache_creation_input_tokens,
799                    &mut tool_ids,
800                    &mut pending_stop_reason,
801                ) {
802                    yield Ok(delta);
803                }
804                if anthropic_data::is_message_stop_event(remaining) {
805                    yield Ok(StreamDelta::Done {
806                        stop_reason: pending_stop_reason.take(),
807                    });
808                }
809            }
810
811            if !received_message_stop {
812                log::warn!(
813                    "Vertex AI (Claude) SSE stream ended without message_stop"
814                );
815                yield Ok(StreamDelta::Error {
816                    message: "Stream ended unexpectedly without completion".to_string(),
817                    kind: StreamErrorKind::ServerError,
818                });
819            }
820        })
821    }
822}
823
824#[cfg(test)]
825mod tests {
826    use super::*;
827
828    #[test]
829    fn test_new_creates_provider() {
830        let provider = VertexProvider::new(
831            "token".to_string(),
832            "my-project".to_string(),
833            "us-central1".to_string(),
834            "custom-model".to_string(),
835        );
836
837        assert_eq!(provider.model(), "custom-model");
838        assert_eq!(provider.provider(), "vertex");
839    }
840
841    #[test]
842    fn test_flash_factory() {
843        let provider = VertexProvider::flash(
844            "token".to_string(),
845            "my-project".to_string(),
846            "us-central1".to_string(),
847        );
848
849        assert_eq!(provider.model(), MODEL_GEMINI_3_FLASH);
850        assert_eq!(provider.provider(), "vertex");
851    }
852
853    #[test]
854    fn test_pro_factory() {
855        let provider = VertexProvider::pro(
856            "token".to_string(),
857            "my-project".to_string(),
858            "us-central1".to_string(),
859        );
860
861        assert_eq!(provider.model(), MODEL_GEMINI_31_PRO);
862        assert_eq!(provider.provider(), "vertex");
863    }
864
865    #[test]
866    fn test_provider_is_cloneable() {
867        let provider = VertexProvider::new(
868            "token".to_string(),
869            "my-project".to_string(),
870            "us-central1".to_string(),
871            "test-model".to_string(),
872        );
873        let cloned = provider.clone();
874
875        assert_eq!(provider.model(), cloned.model());
876        assert_eq!(provider.provider(), cloned.provider());
877    }
878
879    #[test]
880    fn test_is_claude_model() {
881        let claude_provider = VertexProvider::new(
882            "token".to_string(),
883            "project".to_string(),
884            "us-central1".to_string(),
885            "claude-sonnet-4-20250514".to_string(),
886        );
887        assert!(claude_provider.is_claude_model());
888
889        let gemini_provider = VertexProvider::new(
890            "token".to_string(),
891            "project".to_string(),
892            "us-central1".to_string(),
893            "gemini-3-flash-preview".to_string(),
894        );
895        assert!(!gemini_provider.is_claude_model());
896    }
897
898    #[test]
899    fn test_base_url_gemini() {
900        let provider = VertexProvider::new(
901            "token".to_string(),
902            "my-project".to_string(),
903            "us-central1".to_string(),
904            "gemini-3-flash-preview".to_string(),
905        );
906
907        let url = provider.base_url("google");
908        assert_eq!(
909            url,
910            "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/google/models/gemini-3-flash-preview"
911        );
912    }
913
914    #[test]
915    fn test_base_url_claude() {
916        let provider = VertexProvider::new(
917            "token".to_string(),
918            "my-project".to_string(),
919            "us-central1".to_string(),
920            "claude-sonnet-4-20250514".to_string(),
921        );
922
923        let url = provider.base_url("anthropic");
924        assert_eq!(
925            url,
926            "https://us-central1-aiplatform.googleapis.com/v1/projects/my-project/locations/us-central1/publishers/anthropic/models/claude-sonnet-4-20250514"
927        );
928    }
929
930    #[test]
931    fn test_base_url_with_different_region() {
932        let provider = VertexProvider::new(
933            "token".to_string(),
934            "other-project".to_string(),
935            "europe-west4".to_string(),
936            "gemini-3.1-pro-preview".to_string(),
937        );
938
939        let url = provider.base_url("google");
940        assert!(url.starts_with("https://europe-west4-aiplatform.googleapis.com/"));
941        assert!(url.contains("/projects/other-project/"));
942        assert!(url.contains("/locations/europe-west4/"));
943        assert!(url.ends_with("/models/gemini-3.1-pro-preview"));
944    }
945
946    #[test]
947    fn test_base_url_global_region_has_no_prefix() {
948        let provider = VertexProvider::new(
949            "token".to_string(),
950            "my-project".to_string(),
951            "global".to_string(),
952            "gemini-3.1-pro-preview".to_string(),
953        );
954
955        let url = provider.base_url("google");
956        assert_eq!(
957            url,
958            "https://aiplatform.googleapis.com/v1/projects/my-project/locations/global/publishers/google/models/gemini-3.1-pro-preview"
959        );
960    }
961
962    #[test]
963    fn test_vertex_claude_46_rejects_budgeted_thinking() {
964        let provider = VertexProvider::new(
965            "token".to_string(),
966            "project".to_string(),
967            "global".to_string(),
968            MODEL_SONNET_46.to_string(),
969        );
970
971        let error = provider
972            .validate_thinking_config(Some(&ThinkingConfig::new(10_000)))
973            .unwrap_err();
974        assert!(error.to_string().contains("ThinkingConfig::adaptive()"));
975    }
976
977    #[test]
978    fn test_model_constants() {
979        assert_eq!(MODEL_GEMINI_3_FLASH, "gemini-3-flash-preview");
980        assert_eq!(MODEL_GEMINI_31_PRO, "gemini-3.1-pro-preview");
981        assert_eq!(MODEL_GEMINI_3_PRO, "gemini-3.0-pro");
982    }
983}