Skip to main content

codetether_agent/provider/
vertex_anthropic.rs

1//! Vertex AI Anthropic provider implementation
2//!
3//! Claude models (Sonnet 4.6, Opus 4, etc.) via Google Cloud Vertex AI.
4//! Uses service account JWT auth to obtain OAuth2 access tokens.
5//!
6//! Key differences from native Anthropic API:
7//! - Model is specified in URL path, not request body
8//! - `anthropic_version` must be set to `vertex-2023-10-16`
9//! - Uses Bearer token auth (OAuth2) instead of x-api-key header
10//!
11//! Reference: https://cloud.google.com/vertex-ai/generative-ai/docs/partner-models/use-claude
12
13use super::util;
14use super::{
15    CompletionRequest, CompletionResponse, ContentPart, FinishReason, Message, ModelInfo, Provider,
16    Role, StreamChunk, ToolDefinition, Usage,
17};
18use anyhow::{Context, Result};
19use async_trait::async_trait;
20use futures::StreamExt;
21use jsonwebtoken::{Algorithm, EncodingKey, Header};
22use reqwest::Client;
23use serde::Deserialize;
24use serde_json::{Value, json};
25use std::sync::Arc;
26use std::time::Duration;
27use tokio::sync::RwLock;
28
29const REQUEST_TIMEOUT: Duration = Duration::from_secs(120);
30const CONNECT_TIMEOUT: Duration = Duration::from_secs(10);
31const MAX_RETRIES: u32 = 3;
32
33const VERTEX_REGION: &str = "us-east5";
34const GOOGLE_TOKEN_URL: &str = "https://oauth2.googleapis.com/token";
35const VERTEX_SCOPE: &str = "https://www.googleapis.com/auth/cloud-platform";
36const VERTEX_ANTHROPIC_VERSION: &str = "vertex-2023-10-16";
37
38/// Cached OAuth2 access token with expiration tracking
39struct CachedToken {
40    token: String,
41    expires_at: std::time::Instant,
42}
43
44/// GCP service account key (parsed from JSON)
45#[derive(Debug, Clone, Deserialize)]
46struct ServiceAccountKey {
47    client_email: String,
48    private_key: String,
49    token_uri: Option<String>,
50    project_id: Option<String>,
51}
52
53/// JWT claims for GCP service account auth
54#[derive(serde::Serialize)]
55struct JwtClaims {
56    iss: String,
57    scope: String,
58    aud: String,
59    iat: u64,
60    exp: u64,
61}
62
63pub struct VertexAnthropicProvider {
64    client: Client,
65    project_id: String,
66    base_url: String,
67    sa_key: ServiceAccountKey,
68    encoding_key: EncodingKey,
69    /// Cached OAuth2 access token (refreshes ~5 min before expiry)
70    cached_token: Arc<RwLock<Option<CachedToken>>>,
71}
72
73impl std::fmt::Debug for VertexAnthropicProvider {
74    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
75        f.debug_struct("VertexAnthropicProvider")
76            .field("project_id", &self.project_id)
77            .field("base_url", &self.base_url)
78            .field("client_email", &self.sa_key.client_email)
79            .finish()
80    }
81}
82
83impl VertexAnthropicProvider {
84    /// Create from a service account JSON key string
85    pub fn new(sa_json: &str, project_id: Option<String>) -> Result<Self> {
86        let sa_key: ServiceAccountKey =
87            serde_json::from_str(sa_json).context("Failed to parse service account JSON key")?;
88
89        let project_id = project_id
90            .or_else(|| sa_key.project_id.clone())
91            .ok_or_else(|| anyhow::anyhow!("No project_id found in SA key or Vault config"))?;
92
93        let encoding_key = EncodingKey::from_rsa_pem(sa_key.private_key.as_bytes())
94            .context("Failed to parse RSA private key from service account")?;
95
96        // Vertex AI Anthropic endpoint format:
97        // https://REGION-aiplatform.googleapis.com/v1/projects/PROJECT_ID/locations/REGION/publishers/anthropic/models/MODEL_ID:rawPredict
98        let base_url = format!(
99            "https://{}-aiplatform.googleapis.com/v1/projects/{}/locations/{}/publishers/anthropic/models",
100            VERTEX_REGION, project_id, VERTEX_REGION
101        );
102
103        tracing::debug!(
104            provider = "vertex-anthropic",
105            project_id = %project_id,
106            client_email = %sa_key.client_email,
107            base_url = %base_url,
108            "Creating Vertex Anthropic provider with service account"
109        );
110
111        let client = Client::builder()
112            .connect_timeout(CONNECT_TIMEOUT)
113            .timeout(REQUEST_TIMEOUT)
114            .build()
115            .context("Failed to build HTTP client")?;
116
117        Ok(Self {
118            client,
119            project_id,
120            base_url,
121            sa_key,
122            encoding_key,
123            cached_token: Arc::new(RwLock::new(None)),
124        })
125    }
126
127    /// Get a valid OAuth2 access token, refreshing if needed
128    async fn get_access_token(&self) -> Result<String> {
129        // Check cache — refresh 5 minutes before expiration
130        {
131            let cache = self.cached_token.read().await;
132            if let Some(ref cached) = *cache
133                && cached.expires_at
134                    > std::time::Instant::now() + std::time::Duration::from_secs(300)
135            {
136                return Ok(cached.token.clone());
137            }
138        }
139
140        // Sign a JWT assertion
141        let now = std::time::SystemTime::now()
142            .duration_since(std::time::UNIX_EPOCH)
143            .context("System time error")?
144            .as_secs();
145
146        let token_uri = self.sa_key.token_uri.as_deref().unwrap_or(GOOGLE_TOKEN_URL);
147
148        let claims = JwtClaims {
149            iss: self.sa_key.client_email.clone(),
150            scope: VERTEX_SCOPE.to_string(),
151            aud: token_uri.to_string(),
152            iat: now,
153            exp: now + 3600,
154        };
155
156        let header = Header::new(Algorithm::RS256);
157        let assertion = jsonwebtoken::encode(&header, &claims, &self.encoding_key)
158            .context("Failed to sign JWT assertion")?;
159
160        // Exchange JWT for access token
161        let form_body = format!(
162            "grant_type={}&assertion={}",
163            urlencoding::encode("urn:ietf:params:oauth:grant-type:jwt-bearer"),
164            urlencoding::encode(&assertion),
165        );
166        let response = self
167            .client
168            .post(token_uri)
169            .header("Content-Type", "application/x-www-form-urlencoded")
170            .body(form_body)
171            .send()
172            .await
173            .context("Failed to exchange JWT for access token")?;
174
175        let status = response.status();
176        let body = response
177            .text()
178            .await
179            .context("Failed to read token response")?;
180
181        if !status.is_success() {
182            anyhow::bail!("GCP token exchange failed: {status} {body}");
183        }
184
185        #[derive(Deserialize)]
186        struct TokenResponse {
187            access_token: String,
188            #[serde(default)]
189            expires_in: Option<u64>,
190        }
191
192        let token_resp: TokenResponse =
193            serde_json::from_str(&body).context("Failed to parse GCP token response")?;
194
195        let expires_in = token_resp.expires_in.unwrap_or(3600);
196
197        // Cache it
198        {
199            let mut cache = self.cached_token.write().await;
200            *cache = Some(CachedToken {
201                token: token_resp.access_token.clone(),
202                expires_at: std::time::Instant::now() + std::time::Duration::from_secs(expires_in),
203            });
204        }
205
206        tracing::debug!(
207            client_email = %self.sa_key.client_email,
208            expires_in_secs = expires_in,
209            "Refreshed GCP access token via service account JWT"
210        );
211
212        Ok(token_resp.access_token)
213    }
214
215    /// Convert our generic messages to Anthropic Messages API format.
216    /// Same logic as AnthropicProvider, but returns system as Option<Value> for Vertex format.
217    fn convert_messages(messages: &[Message]) -> (Option<Value>, Vec<Value>) {
218        let mut system_blocks: Vec<Value> = Vec::new();
219        let mut api_messages: Vec<Value> = Vec::new();
220
221        for msg in messages {
222            match msg.role {
223                Role::System => {
224                    for part in &msg.content {
225                        match part {
226                            ContentPart::Text { text } => {
227                                system_blocks.push(json!({
228                                    "type": "text",
229                                    "text": text,
230                                }));
231                            }
232                            ContentPart::Thinking { text } => {
233                                system_blocks.push(json!({
234                                    "type": "thinking",
235                                    "thinking": text,
236                                }));
237                            }
238                            _ => {}
239                        }
240                    }
241                }
242                Role::User => {
243                    let mut content_parts: Vec<Value> = Vec::new();
244                    for part in &msg.content {
245                        match part {
246                            ContentPart::Text { text } => {
247                                content_parts.push(json!({
248                                    "type": "text",
249                                    "text": text,
250                                }));
251                            }
252                            ContentPart::Thinking { text } => {
253                                content_parts.push(json!({
254                                    "type": "thinking",
255                                    "thinking": text,
256                                }));
257                            }
258                            _ => {}
259                        }
260                    }
261                    if content_parts.is_empty() {
262                        content_parts.push(json!({"type": "text", "text": " "}));
263                    }
264                    api_messages.push(json!({
265                        "role": "user",
266                        "content": content_parts
267                    }));
268                }
269                Role::Assistant => {
270                    let mut content_parts: Vec<Value> = Vec::new();
271
272                    for part in &msg.content {
273                        match part {
274                            ContentPart::Text { text } => {
275                                content_parts.push(json!({
276                                    "type": "text",
277                                    "text": text
278                                }));
279                            }
280                            ContentPart::Thinking { text } => {
281                                content_parts.push(json!({
282                                    "type": "thinking",
283                                    "thinking": text
284                                }));
285                            }
286                            ContentPart::ToolCall {
287                                id,
288                                name,
289                                arguments,
290                                ..
291                            } => {
292                                let input: Value = serde_json::from_str(arguments)
293                                    .unwrap_or_else(|_| json!({"raw": arguments}));
294                                content_parts.push(json!({
295                                    "type": "tool_use",
296                                    "id": id,
297                                    "name": name,
298                                    "input": input
299                                }));
300                            }
301                            _ => {}
302                        }
303                    }
304
305                    if content_parts.is_empty() {
306                        content_parts.push(json!({"type": "text", "text": " "}));
307                    }
308
309                    api_messages.push(json!({
310                        "role": "assistant",
311                        "content": content_parts
312                    }));
313                }
314                Role::Tool => {
315                    let mut tool_results: Vec<Value> = Vec::new();
316                    for part in &msg.content {
317                        if let ContentPart::ToolResult {
318                            tool_call_id,
319                            content,
320                        } = part
321                        {
322                            tool_results.push(json!({
323                                "type": "tool_result",
324                                "tool_use_id": tool_call_id,
325                                "content": content
326                            }));
327                        }
328                    }
329                    if !tool_results.is_empty() {
330                        api_messages.push(json!({
331                            "role": "user",
332                            "content": tool_results
333                        }));
334                    }
335                }
336            }
337        }
338
339        let system = if system_blocks.is_empty() {
340            None
341        } else if system_blocks.len() == 1 {
342            // Single block: return as string for simplicity
343            system_blocks
344                .first()
345                .and_then(|b| b.get("text"))
346                .and_then(Value::as_str)
347                .map(|s| json!(s))
348        } else {
349            // Multiple blocks: return as array
350            Some(json!(system_blocks))
351        };
352
353        (system, api_messages)
354    }
355
356    fn convert_tools(tools: &[ToolDefinition]) -> Vec<Value> {
357        tools
358            .iter()
359            .map(|t| {
360                json!({
361                    "name": t.name,
362                    "description": t.description,
363                    "input_schema": t.parameters
364                })
365            })
366            .collect()
367    }
368
369    /// Build the Vertex AI URL for a specific model
370    fn build_model_url(&self, model: &str) -> String {
371        // Normalize model ID to Vertex format
372        let model_id = model
373            .trim_start_matches("vertex-anthropic/")
374            .trim_start_matches("anthropic/")
375            .trim_start_matches("claude-");
376
377        // Handle both formats: "claude-sonnet-4-6" and "sonnet-4-6"
378        let final_model_id = if model_id.starts_with("claude-") {
379            model_id.to_string()
380        } else {
381            format!("claude-{model_id}")
382        };
383
384        format!("{}/{}:rawPredict", self.base_url, final_model_id)
385    }
386}
387
388// Response types (same as Anthropic)
389#[derive(Debug, Deserialize)]
390struct AnthropicResponse {
391    #[allow(dead_code)]
392    id: String,
393    #[allow(dead_code)]
394    model: String,
395    content: Vec<AnthropicContent>,
396    #[serde(default)]
397    stop_reason: Option<String>,
398    #[serde(default)]
399    usage: Option<AnthropicUsage>,
400}
401
402#[derive(Debug, Deserialize)]
403#[serde(tag = "type")]
404enum AnthropicContent {
405    #[serde(rename = "text")]
406    Text { text: String },
407    #[serde(rename = "thinking")]
408    Thinking {
409        #[serde(default)]
410        thinking: Option<String>,
411        #[serde(default)]
412        text: Option<String>,
413    },
414    #[serde(rename = "tool_use")]
415    ToolUse {
416        id: String,
417        name: String,
418        input: Value,
419    },
420    #[serde(other)]
421    Unknown,
422}
423
424#[derive(Debug, Deserialize)]
425struct AnthropicUsage {
426    #[serde(default)]
427    input_tokens: usize,
428    #[serde(default)]
429    output_tokens: usize,
430    #[serde(default)]
431    cache_creation_input_tokens: Option<usize>,
432    #[serde(default)]
433    cache_read_input_tokens: Option<usize>,
434}
435
436#[derive(Debug, Deserialize)]
437struct AnthropicError {
438    error: AnthropicErrorDetail,
439}
440
441#[derive(Debug, Deserialize)]
442struct AnthropicErrorDetail {
443    message: String,
444    #[serde(default, rename = "type")]
445    error_type: Option<String>,
446}
447
448#[async_trait]
449impl Provider for VertexAnthropicProvider {
450    fn name(&self) -> &str {
451        "vertex-anthropic"
452    }
453
454    async fn list_models(&self) -> Result<Vec<ModelInfo>> {
455        Ok(vec![
456            ModelInfo {
457                id: "claude-sonnet-4-6".to_string(),
458                name: "Claude Sonnet 4.6 (Vertex AI)".to_string(),
459                provider: "vertex-anthropic".to_string(),
460                context_window: 200_000,
461                max_output_tokens: Some(128_000),
462                supports_vision: true,
463                supports_tools: true,
464                supports_streaming: true,
465                input_cost_per_million: Some(3.0),
466                output_cost_per_million: Some(15.0),
467            },
468            ModelInfo {
469                id: "claude-sonnet-4-20250514".to_string(),
470                name: "Claude Sonnet 4 (Vertex AI)".to_string(),
471                provider: "vertex-anthropic".to_string(),
472                context_window: 200_000,
473                max_output_tokens: Some(64_000),
474                supports_vision: true,
475                supports_tools: true,
476                supports_streaming: true,
477                input_cost_per_million: Some(3.0),
478                output_cost_per_million: Some(15.0),
479            },
480            ModelInfo {
481                id: "claude-opus-4-20250514".to_string(),
482                name: "Claude Opus 4 (Vertex AI)".to_string(),
483                provider: "vertex-anthropic".to_string(),
484                context_window: 200_000,
485                max_output_tokens: Some(32_000),
486                supports_vision: true,
487                supports_tools: true,
488                supports_streaming: true,
489                input_cost_per_million: Some(15.0),
490                output_cost_per_million: Some(75.0),
491            },
492            ModelInfo {
493                id: "claude-3-5-sonnet-v2@20241022".to_string(),
494                name: "Claude 3.5 Sonnet v2 (Vertex AI)".to_string(),
495                provider: "vertex-anthropic".to_string(),
496                context_window: 200_000,
497                max_output_tokens: Some(8_192),
498                supports_vision: true,
499                supports_tools: true,
500                supports_streaming: true,
501                input_cost_per_million: Some(3.0),
502                output_cost_per_million: Some(15.0),
503            },
504            ModelInfo {
505                id: "claude-3-5-sonnet@20240620".to_string(),
506                name: "Claude 3.5 Sonnet (Vertex AI)".to_string(),
507                provider: "vertex-anthropic".to_string(),
508                context_window: 200_000,
509                max_output_tokens: Some(8_192),
510                supports_vision: true,
511                supports_tools: true,
512                supports_streaming: true,
513                input_cost_per_million: Some(3.0),
514                output_cost_per_million: Some(15.0),
515            },
516            ModelInfo {
517                id: "claude-3-haiku@20240307".to_string(),
518                name: "Claude 3 Haiku (Vertex AI)".to_string(),
519                provider: "vertex-anthropic".to_string(),
520                context_window: 200_000,
521                max_output_tokens: Some(4_096),
522                supports_vision: true,
523                supports_tools: true,
524                supports_streaming: true,
525                input_cost_per_million: Some(0.25),
526                output_cost_per_million: Some(1.25),
527            },
528        ])
529    }
530
531    async fn complete(&self, request: CompletionRequest) -> Result<CompletionResponse> {
532        let access_token = self.get_access_token().await?;
533
534        let (system_prompt, messages) = Self::convert_messages(&request.messages);
535        let tools = Self::convert_tools(&request.tools);
536
537        // Build request body - note: model is NOT in body for Vertex, it's in URL
538        let mut body = json!({
539            "anthropic_version": VERTEX_ANTHROPIC_VERSION,
540            "messages": messages,
541            "max_tokens": request.max_tokens.unwrap_or(8192),
542        });
543
544        if let Some(system) = system_prompt {
545            body["system"] = system;
546        }
547        if !tools.is_empty() {
548            body["tools"] = json!(tools);
549        }
550        if let Some(temp) = request.temperature {
551            body["temperature"] = json!(temp);
552        }
553        if let Some(top_p) = request.top_p {
554            body["top_p"] = json!(top_p);
555        }
556
557        let url = self.build_model_url(&request.model);
558
559        tracing::debug!(
560            model = %request.model,
561            url = %url,
562            "Vertex Anthropic request"
563        );
564
565        let mut last_err = None;
566
567        for attempt in 0..MAX_RETRIES {
568            if attempt > 0 {
569                let backoff = Duration::from_millis(1000 * 2u64.pow(attempt - 1));
570                tracing::warn!(
571                    attempt,
572                    backoff_ms = backoff.as_millis() as u64,
573                    "Vertex Anthropic retrying after transient error"
574                );
575                tokio::time::sleep(backoff).await;
576            }
577
578            let send_result = self
579                .client
580                .post(&url)
581                .bearer_auth(&access_token)
582                .header("Content-Type", "application/json")
583                .json(&body)
584                .send()
585                .await;
586
587            let response = match send_result {
588                Ok(r) => r,
589                Err(e) if e.is_timeout() && attempt + 1 < MAX_RETRIES => {
590                    tracing::warn!(error = %e, "Vertex Anthropic request timed out");
591                    last_err = Some(format!("Request timed out: {e}"));
592                    continue;
593                }
594                Err(e) => anyhow::bail!("Failed to send request to Vertex AI Anthropic: {e}"),
595            };
596
597            let status = response.status();
598            let text = response
599                .text()
600                .await
601                .context("Failed to read Vertex AI Anthropic response")?;
602
603            if status == reqwest::StatusCode::SERVICE_UNAVAILABLE && attempt + 1 < MAX_RETRIES {
604                tracing::warn!(status = %status, body = %text, "Vertex Anthropic service unavailable, retrying");
605                last_err = Some(format!("503 Service Unavailable: {text}"));
606                continue;
607            }
608
609            if !status.is_success() {
610                if let Ok(err) = serde_json::from_str::<AnthropicError>(&text) {
611                    anyhow::bail!(
612                        "Vertex AI Anthropic API error: {} ({:?})",
613                        err.error.message,
614                        err.error.error_type
615                    );
616                }
617                anyhow::bail!("Vertex AI Anthropic API error: {} {}", status, text);
618            }
619
620            let response: AnthropicResponse = serde_json::from_str(&text).context(format!(
621                "Failed to parse Vertex AI Anthropic response: {}",
622                util::truncate_bytes_safe(&text, 200)
623            ))?;
624
625            let mut content = Vec::new();
626            let mut has_tool_calls = false;
627
628            for part in &response.content {
629                match part {
630                    AnthropicContent::Text { text } => {
631                        if !text.is_empty() {
632                            content.push(ContentPart::Text { text: text.clone() });
633                        }
634                    }
635                    AnthropicContent::Thinking { thinking, text } => {
636                        let reasoning = thinking
637                            .as_deref()
638                            .or(text.as_deref())
639                            .unwrap_or_default()
640                            .trim()
641                            .to_string();
642                        if !reasoning.is_empty() {
643                            content.push(ContentPart::Thinking { text: reasoning });
644                        }
645                    }
646                    AnthropicContent::ToolUse { id, name, input } => {
647                        has_tool_calls = true;
648                        content.push(ContentPart::ToolCall {
649                            id: id.clone(),
650                            name: name.clone(),
651                            arguments: serde_json::to_string(input).unwrap_or_default(),
652                            thought_signature: None,
653                        });
654                    }
655                    AnthropicContent::Unknown => {}
656                }
657            }
658
659            let finish_reason = if has_tool_calls {
660                FinishReason::ToolCalls
661            } else {
662                match response.stop_reason.as_deref() {
663                    Some("end_turn") | Some("stop") => FinishReason::Stop,
664                    Some("max_tokens") => FinishReason::Length,
665                    Some("tool_use") => FinishReason::ToolCalls,
666                    Some("content_filter") => FinishReason::ContentFilter,
667                    _ => FinishReason::Stop,
668                }
669            };
670
671            let usage = response.usage.as_ref();
672
673            return Ok(CompletionResponse {
674                message: Message {
675                    role: Role::Assistant,
676                    content,
677                },
678                usage: Usage {
679                    prompt_tokens: usage.map(|u| u.input_tokens).unwrap_or(0),
680                    completion_tokens: usage.map(|u| u.output_tokens).unwrap_or(0),
681                    total_tokens: usage.map(|u| u.input_tokens + u.output_tokens).unwrap_or(0),
682                    cache_read_tokens: usage.and_then(|u| u.cache_read_input_tokens),
683                    cache_write_tokens: usage.and_then(|u| u.cache_creation_input_tokens),
684                },
685                finish_reason,
686            });
687        }
688
689        anyhow::bail!(
690            "Vertex AI Anthropic request failed after {MAX_RETRIES} attempts: {}",
691            last_err.unwrap_or_default()
692        )
693    }
694
695    async fn complete_stream(
696        &self,
697        request: CompletionRequest,
698    ) -> Result<futures::stream::BoxStream<'static, StreamChunk>> {
699        let access_token = self.get_access_token().await?;
700
701        let (system_prompt, messages) = Self::convert_messages(&request.messages);
702        let tools = Self::convert_tools(&request.tools);
703
704        let mut body = json!({
705            "anthropic_version": VERTEX_ANTHROPIC_VERSION,
706            "messages": messages,
707            "max_tokens": request.max_tokens.unwrap_or(8192),
708            "stream": true,
709        });
710
711        if let Some(system) = system_prompt {
712            body["system"] = system;
713        }
714        if !tools.is_empty() {
715            body["tools"] = json!(tools);
716        }
717        if let Some(temp) = request.temperature {
718            body["temperature"] = json!(temp);
719        }
720
721        let url = self.build_model_url(&request.model);
722
723        tracing::debug!(model = %request.model, "Vertex Anthropic streaming request");
724
725        let response = self
726            .client
727            .post(&url)
728            .bearer_auth(&access_token)
729            .header("Content-Type", "application/json")
730            .json(&body)
731            .send()
732            .await
733            .context("Failed to send streaming request to Vertex AI Anthropic")?;
734
735        if !response.status().is_success() {
736            let status = response.status();
737            let text = response.text().await.unwrap_or_default();
738            if let Ok(err) = serde_json::from_str::<AnthropicError>(&text) {
739                anyhow::bail!(
740                    "Vertex AI Anthropic API error: {} ({:?})",
741                    err.error.message,
742                    err.error.error_type
743                );
744            }
745            anyhow::bail!("Vertex AI Anthropic streaming error: {} {}", status, text);
746        }
747
748        let stream = response.bytes_stream();
749        let mut buffer = String::new();
750
751        Ok(stream
752            .flat_map(move |chunk_result| {
753                let mut chunks: Vec<StreamChunk> = Vec::new();
754                match chunk_result {
755                    Ok(bytes) => {
756                        let text = String::from_utf8_lossy(&bytes);
757                        buffer.push_str(&text);
758
759                        let mut text_buf = String::new();
760
761                        while let Some(line_end) = buffer.find('\n') {
762                            let line = buffer[..line_end].trim().to_string();
763                            buffer = buffer[line_end + 1..].to_string();
764
765                            // Handle SSE format
766                            if line.starts_with("event:") {
767                                continue; // Skip event type lines
768                            }
769
770                            if let Some(data) = line.strip_prefix("data: ") {
771                                if data == "[DONE]" {
772                                    if !text_buf.is_empty() {
773                                        chunks
774                                            .push(StreamChunk::Text(std::mem::take(&mut text_buf)));
775                                    }
776                                    chunks.push(StreamChunk::Done { usage: None });
777                                    continue;
778                                }
779
780                                // Parse Anthropic streaming event
781                                if let Ok(event) = serde_json::from_str::<Value>(data) {
782                                    let event_type =
783                                        event.get("type").and_then(Value::as_str).unwrap_or("");
784
785                                    match event_type {
786                                        "content_block_delta" => {
787                                            if let Some(delta) = event.get("delta")
788                                                && let Some(text) =
789                                                    delta.get("text").and_then(Value::as_str)
790                                            {
791                                                text_buf.push_str(text);
792                                            }
793                                        }
794                                        "content_block_start" => {
795                                            // Tool call start
796                                            if let Some(content_block) = event.get("content_block")
797                                                && content_block.get("type").and_then(Value::as_str)
798                                                    == Some("tool_use")
799                                            {
800                                                if !text_buf.is_empty() {
801                                                    chunks.push(StreamChunk::Text(std::mem::take(
802                                                        &mut text_buf,
803                                                    )));
804                                                }
805                                                let id = content_block
806                                                    .get("id")
807                                                    .and_then(Value::as_str)
808                                                    .unwrap_or_default();
809                                                let name = content_block
810                                                    .get("name")
811                                                    .and_then(Value::as_str)
812                                                    .unwrap_or_default();
813                                                chunks.push(StreamChunk::ToolCallStart {
814                                                    id: id.to_string(),
815                                                    name: name.to_string(),
816                                                });
817                                            }
818                                        }
819                                        "content_block_stop" => {
820                                            // Tool call end
821                                            let index = event.get("index").and_then(Value::as_u64);
822                                            if let Some(_idx) = index {
823                                                // We'd need to track tool call IDs, for now emit end
824                                                // This is a simplification - real impl needs state tracking
825                                            }
826                                        }
827                                        "message_delta" => {
828                                            // Final stop reason
829                                            if let Some(_usage) = event.get("usage") {
830                                                // Could extract usage here
831                                            }
832                                        }
833                                        "message_stop" => {
834                                            if !text_buf.is_empty() {
835                                                chunks.push(StreamChunk::Text(std::mem::take(
836                                                    &mut text_buf,
837                                                )));
838                                            }
839                                            chunks.push(StreamChunk::Done { usage: None });
840                                        }
841                                        _ => {}
842                                    }
843                                }
844                            }
845                        }
846                        if !text_buf.is_empty() {
847                            chunks.push(StreamChunk::Text(text_buf));
848                        }
849                    }
850                    Err(e) => chunks.push(StreamChunk::Error(e.to_string())),
851                }
852                futures::stream::iter(chunks)
853            })
854            .boxed())
855    }
856}
857
858#[cfg(test)]
859mod tests {
860    use super::*;
861
862    #[test]
863    fn test_rejects_invalid_sa_json() {
864        let result = VertexAnthropicProvider::new("{}", None);
865        assert!(result.is_err());
866    }
867
868    #[test]
869    fn test_rejects_missing_project_id() {
870        let sa_json = json!({
871            "type": "service_account",
872            "client_email": "test@test.iam.gserviceaccount.com",
873            "private_key": "-----BEGIN RSA PRIVATE KEY-----\nMIIBogIBAAJBALRiMLAHudeSA/x3hB2f+2NRkJlS\n-----END RSA PRIVATE KEY-----\n",
874            "token_uri": "https://oauth2.googleapis.com/token"
875        });
876        let result = VertexAnthropicProvider::new(&sa_json.to_string(), None);
877        assert!(result.is_err());
878    }
879
880    #[test]
881    fn test_model_url_building() {
882        // This would require a valid SA key, so just test the logic conceptually
883        // In a real test, we'd mock the provider
884    }
885}