Skip to main content

matrixcode_core/providers/
anthropic.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use futures_util::StreamExt;
4use log::debug;
5use serde_json::{Value, json};
6use tokio::sync::mpsc;
7
8use crate::models::context_window_for;
9use crate::tools::ToolDefinition;
10
11use super::{
12    ChatRequest, ChatResponse, ContentBlock, Message, MessageContent, Provider, Role, StopReason,
13    StreamEvent, Usage,
14};
15
16pub struct AnthropicProvider {
17    api_key: String,
18    model: String,
19    base_url: String,
20    client: reqwest::Client,
21    /// Whether this is an Aliyun DashScope endpoint (requires special headers).
22    is_dashscope: bool,
23}
24
25impl AnthropicProvider {
26    pub fn new(api_key: String, model: String, base_url: String) -> Self {
27        let is_dashscope = base_url.contains("dashscope.aliyuncs.com");
28        let client = reqwest::Client::builder()
29            .timeout(std::time::Duration::from_secs(120))
30            .connect_timeout(std::time::Duration::from_secs(10))
31            .danger_accept_invalid_certs(true)
32            .build()
33            .unwrap_or_else(|_| reqwest::Client::new());
34        Self {
35            api_key,
36            model,
37            base_url,
38            client,
39            is_dashscope,
40        }
41    }
42
43    fn convert_messages(&self, messages: &[Message]) -> Vec<Value> {
44        messages
45            .iter()
46            .filter(|m| m.role != Role::System)
47            .map(|m| {
48                let role = match m.role {
49                    Role::User | Role::Tool => "user",
50                    Role::Assistant => "assistant",
51                    Role::System => unreachable!(),
52                };
53
54                let content = match &m.content {
55                    MessageContent::Text(text) => json!(text),
56                    MessageContent::Blocks(blocks) => {
57                        let converted: Vec<Value> = blocks
58                            .iter()
59                            .map(|b| match b {
60                                ContentBlock::Text { text } => json!({"type": "text", "text": text}),
61                                ContentBlock::ToolUse { id, name, input } => {
62                                    json!({"type": "tool_use", "id": id, "name": name, "input": input})
63                                }
64                                ContentBlock::ToolResult { tool_use_id, content } => {
65                                    json!({"type": "tool_result", "tool_use_id": tool_use_id, "content": content})
66                                }
67                                ContentBlock::Thinking { thinking, signature } => {
68                                    let mut obj = json!({"type": "thinking", "thinking": thinking});
69                                    if let Some(sig) = signature {
70                                        obj["signature"] = json!(sig);
71                                    }
72                                    obj
73                                }
74                                ContentBlock::ServerToolUse { id, name, input } => {
75                                    json!({"type": "server_tool_use", "id": id, "name": name, "input": input})
76                                }
77                                ContentBlock::WebSearchResult { tool_use_id, content } => {
78                                    json!({"type": "web_search_tool_result", "tool_use_id": tool_use_id, "content": content})
79                                }
80                            })
81                            .collect();
82                        json!(converted)
83                    }
84                };
85
86                json!({"role": role, "content": content})
87            })
88            .collect()
89    }
90
91    /// Convert tools with caching control for Anthropic prompt caching.
92    fn convert_tools_with_caching(
93        &self,
94        tools: &[ToolDefinition],
95        enable_caching: bool,
96    ) -> Vec<Value> {
97        let mut converted: Vec<Value> = tools
98            .iter()
99            .map(|t| {
100                json!({
101                    "name": t.name,
102                    "description": t.description,
103                    "input_schema": t.parameters,
104                })
105            })
106            .collect();
107
108        // Add cache_control to the last tool definition for tools caching
109        if enable_caching && !converted.is_empty() {
110            let last_idx = converted.len() - 1;
111            if let Some(obj) = converted[last_idx].as_object_mut() {
112                obj.insert("cache_control".to_string(), json!({"type": "ephemeral"}));
113            }
114        }
115
116        converted
117    }
118
119    /// Build the base JSON body shared by streaming and non-streaming requests.
120    fn build_body(&self, request: &ChatRequest) -> Value {
121        let mut body = json!({
122            "model": self.model,
123            "max_tokens": request.max_tokens,
124            "messages": self.convert_messages(&request.messages),
125        });
126
127        // Add prompt caching for system prompt (Anthropic-specific)
128        if request.enable_caching && !self.is_dashscope {
129            if let Some(system) = &request.system {
130                // System prompt caching: add cache_control to enable caching
131                body["system"] = json!([
132                    {
133                        "type": "text",
134                        "text": system,
135                        "cache_control": {"type": "ephemeral"}
136                    }
137                ]);
138            }
139        } else if let Some(system) = &request.system {
140            body["system"] = json!(system);
141        }
142
143        if !request.tools.is_empty() {
144            let tools = self.convert_tools_with_caching(
145                &request.tools,
146                request.enable_caching && !self.is_dashscope,
147            );
148            body["tools"] = json!(tools);
149        }
150
151        if !request.server_tools.is_empty() {
152            body["tools"] = json!(
153                body["tools"]
154                    .as_array()
155                    .map(|t| {
156                        let mut tools = t.clone();
157                        for st in &request.server_tools {
158                            tools.push(serde_json::to_value(st).unwrap_or_default());
159                        }
160                        tools
161                    })
162                    .unwrap_or_else(|| request
163                        .server_tools
164                        .iter()
165                        .map(|st| serde_json::to_value(st).unwrap_or_default())
166                        .collect())
167            );
168        }
169
170        // DashScope does not support Anthropic's extended thinking feature
171        if request.think && !self.is_dashscope {
172            let config = thinking_config(&self.model);
173            log::debug!(
174                "Adding thinking config for model {}: {:?}",
175                self.model,
176                config
177            );
178            body["thinking"] = config;
179        } else if !request.think {
180            log::debug!("Thinking disabled by request.think=false");
181        } else if self.is_dashscope {
182            log::debug!("Thinking disabled for DashScope");
183        }
184
185        body
186    }
187}
188
189/// Models that require the new `adaptive` thinking mode instead of the
190/// legacy `enabled`+`budget_tokens` form. Conservative allow-list: if we
191/// don't recognize the name we default to the legacy shape (which older
192/// models and most third-party gateways understand).
193fn thinking_config(model: &str) -> Value {
194    let m = model.to_lowercase();
195    // New models (2025+) use adaptive thinking
196    let adaptive = m.contains("opus-4")
197        || m.contains("sonnet-4")
198        || m.contains("claude-4")
199        || m.contains("20250")
200        || m.contains("2025");
201    if adaptive {
202        json!({"type": "enabled", "budget_tokens": 10000})
203    } else {
204        json!({"type": "enabled", "budget_tokens": 5000})
205    }
206}
207
208#[async_trait]
209impl Provider for AnthropicProvider {
210    fn context_size(&self) -> Option<u32> {
211        context_window_for(&self.model)
212    }
213
214    fn clone_box(&self) -> Box<dyn Provider> {
215        Box::new(Self {
216            api_key: self.api_key.clone(),
217            model: self.model.clone(),
218            base_url: self.base_url.clone(),
219            client: reqwest::Client::new(),
220            is_dashscope: self.is_dashscope,
221        })
222    }
223
224    async fn chat(&self, request: ChatRequest) -> Result<ChatResponse> {
225        let body = self.build_body(&request);
226
227        let url = format!("{}/v1/messages", self.base_url);
228        let mut req = self
229            .client
230            .post(&url)
231            .header("User-Agent", "curl/8.0")
232            .json(&body);
233
234        // DashScope uses Bearer auth
235        if self.is_dashscope {
236            req = req.header("Authorization", format!("Bearer {}", self.api_key));
237        } else {
238            req = req
239                .header("x-api-key", &self.api_key)
240                .header("anthropic-version", "2025-04-15")
241                .header("anthropic-beta", "prompt-caching-2024-07-31");
242        }
243
244        let response = req.send().await?;
245
246        let status = response.status();
247        let response_body: Value = response.json().await?;
248
249        if !status.is_success() {
250            let err_msg = response_body["error"]["message"]
251                .as_str()
252                .unwrap_or("unknown error");
253            anyhow::bail!("Anthropic API error ({}): {}", status, err_msg);
254        }
255
256        let stop_reason = match response_body["stop_reason"].as_str() {
257            Some("tool_use") => StopReason::ToolUse,
258            Some("max_tokens") => StopReason::MaxTokens,
259            _ => StopReason::EndTurn,
260        };
261
262        let content = response_body["content"]
263            .as_array()
264            .unwrap_or(&vec![])
265            .iter()
266            .filter_map(|block| match block["type"].as_str()? {
267                "text" => Some(ContentBlock::Text {
268                    text: block["text"].as_str()?.to_string(),
269                }),
270                "tool_use" => Some(ContentBlock::ToolUse {
271                    id: block["id"].as_str()?.to_string(),
272                    name: block["name"].as_str()?.to_string(),
273                    input: block["input"].clone(),
274                }),
275                "thinking" => Some(ContentBlock::Thinking {
276                    thinking: block["thinking"].as_str()?.to_string(),
277                    signature: block["signature"].as_str().map(String::from),
278                }),
279                "server_tool_use" => Some(ContentBlock::ServerToolUse {
280                    id: block["id"].as_str()?.to_string(),
281                    name: block["name"].as_str()?.to_string(),
282                    input: block["input"].clone(),
283                }),
284                "web_search_tool_result" => {
285                    let tool_use_id = block["tool_use_id"].as_str()?.to_string();
286                    let content = parse_web_search_content(&block["content"]);
287                    Some(ContentBlock::WebSearchResult {
288                        tool_use_id,
289                        content,
290                    })
291                }
292                _ => None,
293            })
294            .collect();
295
296        Ok(ChatResponse {
297            content,
298            stop_reason,
299            usage: parse_usage(&response_body["usage"]),
300        })
301    }
302
303    async fn chat_stream(&self, request: ChatRequest) -> Result<mpsc::Receiver<StreamEvent>> {
304        let mut body = self.build_body(&request);
305        body["stream"] = json!(true);
306
307        let url = format!("{}/v1/messages", self.base_url);
308        let mut req = self
309            .client
310            .post(&url)
311            .header("User-Agent", "curl/8.0")
312            .json(&body);
313
314        // DashScope uses Bearer auth and requires SSE header for streaming
315        if self.is_dashscope {
316            req = req
317                .header("Authorization", format!("Bearer {}", self.api_key))
318                .header("X-DashScope-SSE", "enable");
319        } else {
320            req = req
321                .header("x-api-key", &self.api_key)
322                .header("anthropic-version", "2025-04-15")
323                .header("anthropic-beta", "prompt-caching-2024-07-31");
324        }
325
326        let response = req.send().await?;
327
328        if !response.status().is_success() {
329            let status = response.status();
330            let text = response.text().await.unwrap_or_default();
331            anyhow::bail!("Anthropic API error ({}): {}", status, text);
332        }
333
334        let (tx, rx) = mpsc::channel(64);
335        tokio::spawn(async move {
336            let mut stream = response.bytes_stream();
337            let mut buffer = String::new();
338            let mut sent_first_byte = false;
339
340            // In-flight block assembly: index → partial data
341            let mut blocks: Vec<AssembledBlock> = Vec::new();
342            let mut stop_reason = StopReason::EndTurn;
343            let mut usage = Usage::default();
344
345            while let Some(chunk) = stream.next().await {
346                let chunk = match chunk {
347                    Ok(c) => c,
348                    Err(e) => {
349                        let _ = tx
350                            .send(StreamEvent::Error(format!("stream read error: {}", e)))
351                            .await;
352                        return;
353                    }
354                };
355
356                if !sent_first_byte {
357                    sent_first_byte = true;
358                    let _ = tx.send(StreamEvent::FirstByte).await;
359                }
360
361                buffer.push_str(&String::from_utf8_lossy(&chunk));
362
363                while let Some(frame) = take_next_sse_frame(&mut buffer) {
364                    if handle_sse_frame(&frame, &mut blocks, &mut stop_reason, &mut usage, &tx)
365                        .await
366                    {
367                        return;
368                    }
369                }
370            }
371
372            if let Some(frame) = take_trailing_sse_frame(&mut buffer)
373                && handle_sse_frame(&frame, &mut blocks, &mut stop_reason, &mut usage, &tx).await
374            {
375                return;
376            }
377
378            if sent_first_byte {
379                debug!("stream ended without explicit message_stop; finalizing best-effort");
380                let _ = tx
381                    .send(StreamEvent::Done(finalize_incomplete_stream(
382                        std::mem::take(&mut blocks),
383                        stop_reason,
384                        usage,
385                    )))
386                    .await;
387            } else {
388                let _ = tx
389                    .send(StreamEvent::Error(
390                        "stream ended before any events were received".to_string(),
391                    ))
392                    .await;
393            }
394        });
395
396        Ok(rx)
397    }
398}
399
400fn take_next_sse_frame(buffer: &mut String) -> Option<String> {
401    let lf = buffer.find("\n\n").map(|pos| (pos, 2usize));
402    let crlf = buffer.find("\r\n\r\n").map(|pos| (pos, 4usize));
403    let (pos, delim_len) = match (lf, crlf) {
404        (Some(a), Some(b)) => {
405            if a.0 <= b.0 {
406                a
407            } else {
408                b
409            }
410        }
411        (Some(a), None) => a,
412        (None, Some(b)) => b,
413        (None, None) => return None,
414    };
415
416    let frame = buffer[..pos].to_string();
417    buffer.drain(..pos + delim_len);
418    Some(frame)
419}
420
421fn take_trailing_sse_frame(buffer: &mut String) -> Option<String> {
422    let frame = buffer.trim().trim_end_matches('\r').to_string();
423    buffer.clear();
424    if frame.is_empty() { None } else { Some(frame) }
425}
426
427fn extract_sse_data_line(frame: &str) -> Option<String> {
428    for line in frame.lines() {
429        let line = line.trim_end_matches('\r');
430        // Support both "data: " (Anthropic) and "data:" (DashScope)
431        if let Some(rest) = line.strip_prefix("data: ") {
432            return Some(rest.to_string());
433        }
434        if let Some(rest) = line.strip_prefix("data:") {
435            return Some(rest.to_string());
436        }
437    }
438    None
439}
440
441async fn handle_sse_frame(
442    frame: &str,
443    blocks: &mut Vec<AssembledBlock>,
444    stop_reason: &mut StopReason,
445    usage: &mut Usage,
446    tx: &mpsc::Sender<StreamEvent>,
447) -> bool {
448    let Some(data_line) = extract_sse_data_line(frame) else {
449        return false;
450    };
451
452    let evt: Value = match serde_json::from_str(&data_line) {
453        Ok(v) => v,
454        Err(_) => return false,
455    };
456
457    handle_sse_event(evt, blocks, stop_reason, usage, tx).await
458}
459
460async fn handle_sse_event(
461    evt: Value,
462    blocks: &mut Vec<AssembledBlock>,
463    stop_reason: &mut StopReason,
464    usage: &mut Usage,
465    tx: &mpsc::Sender<StreamEvent>,
466) -> bool {
467    match evt["type"].as_str().unwrap_or("") {
468        "message_start" => {
469            // Initial usage payload — `input_tokens` is final
470            // (they don't grow during streaming) but
471            // `output_tokens` starts near 0 and is updated by
472            // subsequent `message_delta` events.
473            *usage = merge_usage(usage.clone(), &evt["message"]["usage"]);
474            debug!(
475                "message_start: usage_json={}",
476                serde_json::to_string(&evt["message"]["usage"]).unwrap_or_default()
477            );
478            debug!(
479                "message_start parsed: input={}, output={}, cache_read={}, cache_created={}",
480                usage.input_tokens,
481                usage.output_tokens,
482                usage.cache_read_input_tokens,
483                usage.cache_creation_input_tokens
484            );
485            // Send real-time usage update
486            let _ = tx
487                .send(StreamEvent::Usage {
488                    output_tokens: usage.output_tokens,
489                })
490                .await;
491        }
492        "content_block_start" => {
493            let idx = evt["index"].as_u64().unwrap_or(0) as usize;
494            let block = &evt["content_block"];
495            let kind = block["type"].as_str().unwrap_or("");
496            while blocks.len() <= idx {
497                blocks.push(AssembledBlock::default());
498            }
499            match kind {
500                "text" => {
501                    blocks[idx] = AssembledBlock::Text(String::new());
502                }
503                "thinking" => {
504                    blocks[idx] = AssembledBlock::Thinking {
505                        text: String::new(),
506                        signature: None,
507                    };
508                }
509                "tool_use" | "server_tool_use" => {
510                    let id = block["id"].as_str().unwrap_or_default();
511                    let name = block["name"].as_str().unwrap_or_default();
512                    let is_server = kind == "server_tool_use";
513                    blocks[idx] = if is_server {
514                        AssembledBlock::ServerToolUse {
515                            id: id.into(),
516                            name: name.into(),
517                            input_json: String::new(),
518                        }
519                    } else {
520                        AssembledBlock::ToolUse {
521                            id: id.into(),
522                            name: name.into(),
523                            input_json: String::new(),
524                        }
525                    };
526                    let _ = tx
527                        .send(StreamEvent::ToolUseStart {
528                            id: id.into(),
529                            name: name.into(),
530                        })
531                        .await;
532                }
533                "web_search_tool_result" => {
534                    let tool_use_id = block["tool_use_id"].as_str().unwrap_or("").to_string();
535                    blocks[idx] = AssembledBlock::WebSearchResult {
536                        tool_use_id,
537                        content_json: String::new(),
538                    };
539                }
540                _ => {}
541            }
542        }
543        "content_block_delta" => {
544            let idx = evt["index"].as_u64().unwrap_or(0) as usize;
545            let delta = &evt["delta"];
546            let dt = delta["type"].as_str().unwrap_or("");
547            if idx >= blocks.len() {
548                return false;
549            }
550            match (dt, &mut blocks[idx]) {
551                ("text_delta", AssembledBlock::Text(buf)) => {
552                    if let Some(t) = delta["text"].as_str() {
553                        buf.push_str(t);
554                        let _ = tx.send(StreamEvent::TextDelta(t.to_string())).await;
555                    }
556                }
557                ("thinking_delta", AssembledBlock::Thinking { text, .. }) => {
558                    if let Some(t) = delta["thinking"].as_str() {
559                        text.push_str(t);
560                        log::debug!("Received thinking_delta: {} chars", t.len());
561                        let _ = tx.send(StreamEvent::ThinkingDelta(t.to_string())).await;
562                    }
563                }
564                ("signature_delta", AssembledBlock::Thinking { signature, .. }) => {
565                    if let Some(s) = delta["signature"].as_str() {
566                        signature.get_or_insert_with(String::new).push_str(s);
567                    }
568                }
569                ("input_json_delta", AssembledBlock::ToolUse { input_json, .. }) => {
570                    if let Some(p) = delta["partial_json"].as_str() {
571                        input_json.push_str(p);
572                        let _ = tx
573                            .send(StreamEvent::ToolInputDelta {
574                                bytes_so_far: input_json.len(),
575                            })
576                            .await;
577                    }
578                }
579                ("input_json_delta", AssembledBlock::ServerToolUse { input_json, .. }) => {
580                    if let Some(p) = delta["partial_json"].as_str() {
581                        input_json.push_str(p);
582                        let _ = tx
583                            .send(StreamEvent::ToolInputDelta {
584                                bytes_so_far: input_json.len(),
585                            })
586                            .await;
587                    }
588                }
589                _ => {}
590            }
591        }
592        "message_delta" => {
593            if let Some(sr) = evt["delta"]["stop_reason"].as_str() {
594                *stop_reason = match sr {
595                    "tool_use" => StopReason::ToolUse,
596                    "max_tokens" => StopReason::MaxTokens,
597                    _ => StopReason::EndTurn,
598                };
599            }
600            // `usage` on message_delta reflects cumulative
601            // counts for the current message — most notably
602            // the final `output_tokens`.
603            *usage = merge_usage(usage.clone(), &evt["usage"]);
604            debug!(
605                "message_delta: input={}, output={}, cache_read={}, cache_created={}",
606                usage.input_tokens,
607                usage.output_tokens,
608                usage.cache_read_input_tokens,
609                usage.cache_creation_input_tokens
610            );
611            // Send real-time usage update
612            let _ = tx
613                .send(StreamEvent::Usage {
614                    output_tokens: usage.output_tokens,
615                })
616                .await;
617        }
618        "message_stop" => {
619            debug!(
620                "Message completed: stop_reason={}, usage={}",
621                match *stop_reason {
622                    StopReason::EndTurn => "end_turn",
623                    StopReason::ToolUse => "tool_use",
624                    StopReason::MaxTokens => "max_tokens",
625                },
626                usage.output_tokens
627            );
628            debug!(
629                "message_stop final usage: cache_read={}, cache_created={}",
630                usage.cache_read_input_tokens, usage.cache_creation_input_tokens
631            );
632            let _ = tx
633                .send(StreamEvent::Done(finalize_incomplete_stream(
634                    std::mem::take(blocks),
635                    stop_reason.clone(),
636                    usage.clone(),
637                )))
638                .await;
639            return true;
640        }
641        "error" => {
642            let msg = evt["error"]["message"]
643                .as_str()
644                .unwrap_or("unknown stream error")
645                .to_string();
646            let _ = tx.send(StreamEvent::Error(msg)).await;
647            return true;
648        }
649        _ => {}
650    }
651
652    false
653}
654
655fn build_chat_response(
656    blocks: Vec<AssembledBlock>,
657    stop_reason: StopReason,
658    usage: Usage,
659) -> ChatResponse {
660    let content: Vec<ContentBlock> = blocks.into_iter().filter_map(|b| b.finish()).collect();
661    ChatResponse {
662        content,
663        stop_reason,
664        usage,
665    }
666}
667
668fn finalize_incomplete_stream(
669    blocks: Vec<AssembledBlock>,
670    stop_reason: StopReason,
671    usage: Usage,
672) -> ChatResponse {
673    build_chat_response(blocks, stop_reason, usage)
674}
675
676#[derive(Default)]
677enum AssembledBlock {
678    #[default]
679    Empty,
680    Text(String),
681    Thinking {
682        text: String,
683        signature: Option<String>,
684    },
685    ToolUse {
686        id: String,
687        name: String,
688        input_json: String,
689    },
690    ServerToolUse {
691        id: String,
692        name: String,
693        input_json: String,
694    },
695    WebSearchResult {
696        tool_use_id: String,
697        content_json: String,
698    },
699}
700
701impl AssembledBlock {
702    fn finish(self) -> Option<ContentBlock> {
703        match self {
704            AssembledBlock::Empty => None,
705            AssembledBlock::Text(text) => Some(ContentBlock::Text { text }),
706            AssembledBlock::Thinking { text, signature } => Some(ContentBlock::Thinking {
707                thinking: text,
708                signature,
709            }),
710            AssembledBlock::ToolUse {
711                id,
712                name,
713                input_json,
714            } => {
715                let input: Value = if input_json.is_empty() {
716                    json!({})
717                } else {
718                    serde_json::from_str(&input_json).unwrap_or(json!({}))
719                };
720                Some(ContentBlock::ToolUse { id, name, input })
721            }
722            AssembledBlock::ServerToolUse {
723                id,
724                name,
725                input_json,
726            } => {
727                let input: Value = if input_json.is_empty() {
728                    json!({})
729                } else {
730                    serde_json::from_str(&input_json).unwrap_or(json!({}))
731                };
732                Some(ContentBlock::ServerToolUse { id, name, input })
733            }
734            AssembledBlock::WebSearchResult {
735                tool_use_id,
736                content_json,
737            } => {
738                let content: Value = if content_json.is_empty() {
739                    json!({"results": []})
740                } else {
741                    serde_json::from_str(&content_json).unwrap_or(json!({"results": []}))
742                };
743                Some(ContentBlock::WebSearchResult {
744                    tool_use_id,
745                    content: parse_web_search_content(&content),
746                })
747            }
748        }
749    }
750}
751
752/// Parse the provider's `usage` blob (non-streaming response) into our
753/// internal `Usage` struct. Missing fields default to 0.
754fn parse_usage(u: &Value) -> Usage {
755    Usage {
756        input_tokens: u["input_tokens"].as_u64().unwrap_or(0) as u32,
757        output_tokens: u["output_tokens"].as_u64().unwrap_or(0) as u32,
758        cache_creation_input_tokens: u["cache_creation_input_tokens"].as_u64().unwrap_or(0) as u32,
759        cache_read_input_tokens: u["cache_read_input_tokens"].as_u64().unwrap_or(0) as u32,
760    }
761}
762
763/// Merge a fresh usage payload into the accumulated one. Non-zero new values
764/// override prior ones — this matches the streaming protocol where
765/// `message_start` gives input counts and `message_delta` updates output.
766fn merge_usage(mut acc: Usage, u: &Value) -> Usage {
767    let fresh = parse_usage(u);
768    if fresh.input_tokens > 0 {
769        acc.input_tokens = fresh.input_tokens;
770    }
771    if fresh.output_tokens > 0 {
772        acc.output_tokens = fresh.output_tokens;
773    }
774    if fresh.cache_creation_input_tokens > 0 {
775        acc.cache_creation_input_tokens = fresh.cache_creation_input_tokens;
776    }
777    if fresh.cache_read_input_tokens > 0 {
778        acc.cache_read_input_tokens = fresh.cache_read_input_tokens;
779    }
780    acc
781}
782
783/// Parse web search content from the API response.
784fn parse_web_search_content(value: &serde_json::Value) -> crate::providers::WebSearchContent {
785    let results = value["results"]
786        .as_array()
787        .map(|arr| {
788            arr.iter()
789                .filter_map(|item| {
790                    Some(crate::providers::WebSearchResultItem {
791                        title: item["title"].as_str().map(String::from),
792                        url: item["url"].as_str()?.to_string(),
793                        encrypted_content: item["encrypted_content"].as_str().map(String::from),
794                        snippet: item["snippet"].as_str().map(String::from),
795                    })
796                })
797                .collect()
798        })
799        .unwrap_or_default();
800
801    crate::providers::WebSearchContent { results }
802}
803
804#[cfg(test)]
805mod tests {
806    use super::*;
807
808    #[test]
809    fn take_next_sse_frame_supports_crlf_delimiter() {
810        let mut buffer = concat!(
811            "event: message_start\r\n",
812            "data: {\"type\":\"message_start\"}\r\n\r\n",
813            "data: {\"type\":\"message_stop\"}\r\n\r\n"
814        )
815        .to_string();
816
817        let first = take_next_sse_frame(&mut buffer).expect("first frame");
818        assert!(first.contains("message_start"));
819
820        let second = take_next_sse_frame(&mut buffer).expect("second frame");
821        assert!(second.contains("message_stop"));
822        assert!(buffer.is_empty());
823    }
824
825    #[test]
826    fn take_trailing_sse_frame_returns_unterminated_event() {
827        let mut buffer = "data: {\"type\":\"message_stop\"}\r\n".to_string();
828        let frame = take_trailing_sse_frame(&mut buffer).expect("trailing frame");
829        assert_eq!(frame, "data: {\"type\":\"message_stop\"}");
830        assert!(buffer.is_empty());
831    }
832
833    #[test]
834    fn extract_sse_data_line_supports_optional_space() {
835        assert_eq!(
836            extract_sse_data_line("event: x\r\ndata: {\"k\":1}\r"),
837            Some("{\"k\":1}".to_string())
838        );
839        assert_eq!(
840            extract_sse_data_line("event: x\r\ndata:{\"k\":2}\r"),
841            Some("{\"k\":2}".to_string())
842        );
843    }
844
845    #[test]
846    fn finalize_incomplete_stream_keeps_accumulated_content() {
847        let response = finalize_incomplete_stream(
848            vec![AssembledBlock::Text("partial reply".to_string())],
849            StopReason::EndTurn,
850            Usage::default(),
851        );
852
853        assert_eq!(response.stop_reason, StopReason::EndTurn);
854        assert_eq!(response.content.len(), 1);
855        match &response.content[0] {
856            ContentBlock::Text { text } => assert_eq!(text, "partial reply"),
857            other => panic!("unexpected block: {other:?}"),
858        }
859    }
860}