Skip to main content

matrixcode_core/providers/
anthropic.rs

1use anyhow::Result;
2use async_trait::async_trait;
3use futures_util::StreamExt;
4use log::debug;
5use serde_json::{Value, json};
6use tokio::sync::mpsc;
7
8use crate::tools::ToolDefinition;
9
10use super::{
11    ChatRequest, ChatResponse, ContentBlock, Message, MessageContent, Provider, Role, StopReason,
12    StreamEvent, Usage,
13};
14
15pub struct AnthropicProvider {
16    api_key: String,
17    model: String,
18    base_url: String,
19    client: reqwest::Client,
20    /// Whether this is an Aliyun DashScope endpoint (requires special headers).
21    is_dashscope: bool,
22}
23
24impl AnthropicProvider {
25    pub fn new(api_key: String, model: String, base_url: String) -> Self {
26        let is_dashscope = base_url.contains("dashscope.aliyuncs.com");
27        Self {
28            api_key,
29            model,
30            base_url,
31            client: reqwest::Client::new(),
32            is_dashscope,
33        }
34    }
35
36    fn convert_messages(&self, messages: &[Message]) -> Vec<Value> {
37        messages
38            .iter()
39            .filter(|m| m.role != Role::System)
40            .map(|m| {
41                let role = match m.role {
42                    Role::User | Role::Tool => "user",
43                    Role::Assistant => "assistant",
44                    Role::System => unreachable!(),
45                };
46
47                let content = match &m.content {
48                    MessageContent::Text(text) => json!(text),
49                    MessageContent::Blocks(blocks) => {
50                        let converted: Vec<Value> = blocks
51                            .iter()
52                            .map(|b| match b {
53                                ContentBlock::Text { text } => json!({"type": "text", "text": text}),
54                                ContentBlock::ToolUse { id, name, input } => {
55                                    json!({"type": "tool_use", "id": id, "name": name, "input": input})
56                                }
57                                ContentBlock::ToolResult { tool_use_id, content } => {
58                                    json!({"type": "tool_result", "tool_use_id": tool_use_id, "content": content})
59                                }
60                                ContentBlock::Thinking { thinking, signature } => {
61                                    let mut obj = json!({"type": "thinking", "thinking": thinking});
62                                    if let Some(sig) = signature {
63                                        obj["signature"] = json!(sig);
64                                    }
65                                    obj
66                                }
67                                ContentBlock::ServerToolUse { id, name, input } => {
68                                    json!({"type": "server_tool_use", "id": id, "name": name, "input": input})
69                                }
70                                ContentBlock::WebSearchResult { tool_use_id, content } => {
71                                    json!({"type": "web_search_tool_result", "tool_use_id": tool_use_id, "content": content})
72                                }
73                            })
74                            .collect();
75                        json!(converted)
76                    }
77                };
78
79                json!({"role": role, "content": content})
80            })
81            .collect()
82    }
83
84    /// Convert tools with caching control for Anthropic prompt caching.
85    fn convert_tools_with_caching(&self, tools: &[ToolDefinition], enable_caching: bool) -> Vec<Value> {
86        let mut converted: Vec<Value> = tools
87            .iter()
88            .map(|t| {
89                json!({
90                    "name": t.name,
91                    "description": t.description,
92                    "input_schema": t.parameters,
93                })
94            })
95            .collect();
96        
97        // Add cache_control to the last tool definition for tools caching
98        if enable_caching && !converted.is_empty() {
99            let last_idx = converted.len() - 1;
100            if let Some(obj) = converted[last_idx].as_object_mut() {
101                obj.insert("cache_control".to_string(), json!({"type": "ephemeral"}));
102            }
103        }
104        
105        converted
106    }
107
108    /// Build the base JSON body shared by streaming and non-streaming requests.
109    fn build_body(&self, request: &ChatRequest) -> Value {
110        let mut body = json!({
111            "model": self.model,
112            "max_tokens": request.max_tokens,
113            "messages": self.convert_messages(&request.messages),
114        });
115
116        // Add prompt caching for system prompt (Anthropic-specific)
117        if request.enable_caching && !self.is_dashscope {
118            if let Some(system) = &request.system {
119                // System prompt caching: add cache_control to enable caching
120                body["system"] = json!([
121                    {
122                        "type": "text",
123                        "text": system,
124                        "cache_control": {"type": "ephemeral"}
125                    }
126                ]);
127            }
128        } else if let Some(system) = &request.system {
129            body["system"] = json!(system);
130        }
131
132        if !request.tools.is_empty() {
133            let tools = self.convert_tools_with_caching(&request.tools, request.enable_caching && !self.is_dashscope);
134            body["tools"] = json!(tools);
135        }
136
137        if !request.server_tools.is_empty() {
138            body["tools"] = json!(body["tools"]
139                .as_array()
140                .map(|t| {
141                    let mut tools = t.clone();
142                    for st in &request.server_tools {
143                        tools.push(serde_json::to_value(st).unwrap_or_default());
144                    }
145                    tools
146                })
147                .unwrap_or_else(|| request.server_tools.iter().map(|st| serde_json::to_value(st).unwrap_or_default()).collect()));
148        }
149
150        // DashScope does not support Anthropic's extended thinking feature
151        if request.think && !self.is_dashscope {
152            body["thinking"] = thinking_config(&self.model);
153        }
154
155        body
156    }
157}
158
159/// Models that require the new `adaptive` thinking mode instead of the
160/// legacy `enabled`+`budget_tokens` form. Conservative allow-list: if we
161/// don't recognize the name we default to the legacy shape (which older
162/// models and most third-party gateways understand).
163fn thinking_config(model: &str) -> Value {
164    let adaptive = model.contains("opus-4-7") || model.contains("opus-4.7");
165    if adaptive {
166        json!({"type": "adaptive"})
167    } else {
168        json!({"type": "enabled", "budget_tokens": 2048})
169    }
170}
171
172#[async_trait]
173impl Provider for AnthropicProvider {
174    fn context_size(&self) -> Option<u32> {
175        context_window_for(&self.model)
176    }
177
178    fn clone_box(&self) -> Box<dyn Provider> {
179        Box::new(Self {
180            api_key: self.api_key.clone(),
181            model: self.model.clone(),
182            base_url: self.base_url.clone(),
183            client: reqwest::Client::new(),
184            is_dashscope: self.is_dashscope,
185        })
186    }
187
188    async fn chat(&self, request: ChatRequest) -> Result<ChatResponse> {
189        let body = self.build_body(&request);
190
191        let url = format!("{}/v1/messages", self.base_url);
192        let mut req = self
193            .client
194            .post(&url)
195            .header("User-Agent", "curl/8.0")
196            .json(&body);
197
198        // DashScope uses Bearer auth
199        if self.is_dashscope {
200            req = req.header("Authorization", format!("Bearer {}", self.api_key));
201        } else {
202            req = req.header("x-api-key", &self.api_key)
203                .header("anthropic-version", "2023-06-01")
204                .header("anthropic-beta", "prompt-caching-2024-07-31");
205        }
206
207        let response = req.send().await?;
208
209        let status = response.status();
210        let response_body: Value = response.json().await?;
211
212        if !status.is_success() {
213            let err_msg = response_body["error"]["message"]
214                .as_str()
215                .unwrap_or("unknown error");
216            anyhow::bail!("Anthropic API error ({}): {}", status, err_msg);
217        }
218
219        let stop_reason = match response_body["stop_reason"].as_str() {
220            Some("tool_use") => StopReason::ToolUse,
221            Some("max_tokens") => StopReason::MaxTokens,
222            _ => StopReason::EndTurn,
223        };
224
225        let content = response_body["content"]
226            .as_array()
227            .unwrap_or(&vec![])
228            .iter()
229            .filter_map(|block| match block["type"].as_str()? {
230                "text" => Some(ContentBlock::Text {
231                    text: block["text"].as_str()?.to_string(),
232                }),
233                "tool_use" => Some(ContentBlock::ToolUse {
234                    id: block["id"].as_str()?.to_string(),
235                    name: block["name"].as_str()?.to_string(),
236                    input: block["input"].clone(),
237                }),
238                "thinking" => Some(ContentBlock::Thinking {
239                    thinking: block["thinking"].as_str()?.to_string(),
240                    signature: block["signature"].as_str().map(String::from),
241                }),
242                "server_tool_use" => Some(ContentBlock::ServerToolUse {
243                    id: block["id"].as_str()?.to_string(),
244                    name: block["name"].as_str()?.to_string(),
245                    input: block["input"].clone(),
246                }),
247                "web_search_tool_result" => {
248                    let tool_use_id = block["tool_use_id"].as_str()?.to_string();
249                    let content = parse_web_search_content(&block["content"]);
250                    Some(ContentBlock::WebSearchResult {
251                        tool_use_id,
252                        content,
253                    })
254                }
255                _ => None,
256            })
257            .collect();
258
259        Ok(ChatResponse {
260            content,
261            stop_reason,
262            usage: parse_usage(&response_body["usage"]),
263        })
264    }
265
266    async fn chat_stream(&self, request: ChatRequest) -> Result<mpsc::Receiver<StreamEvent>> {
267        let mut body = self.build_body(&request);
268        body["stream"] = json!(true);
269
270        let url = format!("{}/v1/messages", self.base_url);
271        let mut req = self
272            .client
273            .post(&url)
274            .header("User-Agent", "curl/8.0")
275            .json(&body);
276
277        // DashScope uses Bearer auth and requires SSE header for streaming
278        if self.is_dashscope {
279            req = req
280                .header("Authorization", format!("Bearer {}", self.api_key))
281                .header("X-DashScope-SSE", "enable");
282        } else {
283            req = req.header("x-api-key", &self.api_key)
284                .header("anthropic-version", "2023-06-01")
285                .header("anthropic-beta", "prompt-caching-2024-07-31");
286        }
287
288        let response = req.send().await?;
289
290        if !response.status().is_success() {
291            let status = response.status();
292            let text = response.text().await.unwrap_or_default();
293            anyhow::bail!("Anthropic API error ({}): {}", status, text);
294        }
295
296        let (tx, rx) = mpsc::channel(64);
297        tokio::spawn(async move {
298            let mut stream = response.bytes_stream();
299            let mut buffer = String::new();
300            let mut sent_first_byte = false;
301
302            // In-flight block assembly: index → partial data
303            let mut blocks: Vec<AssembledBlock> = Vec::new();
304            let mut stop_reason = StopReason::EndTurn;
305            let mut usage = Usage::default();
306
307            while let Some(chunk) = stream.next().await {
308                let chunk = match chunk {
309                    Ok(c) => c,
310                    Err(e) => {
311                        let _ = tx
312                            .send(StreamEvent::Error(format!("stream read error: {}", e)))
313                            .await;
314                        return;
315                    }
316                };
317
318                if !sent_first_byte {
319                    sent_first_byte = true;
320                    let _ = tx.send(StreamEvent::FirstByte).await;
321                }
322
323                buffer.push_str(&String::from_utf8_lossy(&chunk));
324
325                while let Some(frame) = take_next_sse_frame(&mut buffer) {
326                    if handle_sse_frame(
327                        &frame,
328                        &mut blocks,
329                        &mut stop_reason,
330                        &mut usage,
331                        &tx,
332                    )
333                    .await
334                    {
335                        return;
336                    }
337                }
338            }
339
340            if let Some(frame) = take_trailing_sse_frame(&mut buffer)
341                && handle_sse_frame(&frame, &mut blocks, &mut stop_reason, &mut usage, &tx).await {
342                    return;
343                }
344
345            if sent_first_byte {
346                debug!(
347                    "stream ended without explicit message_stop; finalizing best-effort"
348                );
349                let _ = tx
350                    .send(StreamEvent::Done(finalize_incomplete_stream(
351                        std::mem::take(&mut blocks),
352                        stop_reason,
353                        usage,
354                    )))
355                    .await;
356            } else {
357                let _ = tx
358                    .send(StreamEvent::Error(
359                        "stream ended before any events were received".to_string(),
360                    ))
361                    .await;
362            }
363        });
364
365        Ok(rx)
366    }
367}
368
369fn take_next_sse_frame(buffer: &mut String) -> Option<String> {
370    let lf = buffer.find("\n\n").map(|pos| (pos, 2usize));
371    let crlf = buffer.find("\r\n\r\n").map(|pos| (pos, 4usize));
372    let (pos, delim_len) = match (lf, crlf) {
373        (Some(a), Some(b)) => {
374            if a.0 <= b.0 {
375                a
376            } else {
377                b
378            }
379        }
380        (Some(a), None) => a,
381        (None, Some(b)) => b,
382        (None, None) => return None,
383    };
384
385    let frame = buffer[..pos].to_string();
386    buffer.drain(..pos + delim_len);
387    Some(frame)
388}
389
390fn take_trailing_sse_frame(buffer: &mut String) -> Option<String> {
391    let frame = buffer.trim().trim_end_matches('\r').to_string();
392    buffer.clear();
393    if frame.is_empty() {
394        None
395    } else {
396        Some(frame)
397    }
398}
399
400fn extract_sse_data_line(frame: &str) -> Option<String> {
401    for line in frame.lines() {
402        let line = line.trim_end_matches('\r');
403        // Support both "data: " (Anthropic) and "data:" (DashScope)
404        if let Some(rest) = line.strip_prefix("data: ") {
405            return Some(rest.to_string());
406        }
407        if let Some(rest) = line.strip_prefix("data:") {
408            return Some(rest.to_string());
409        }
410    }
411    None
412}
413
414async fn handle_sse_frame(
415    frame: &str,
416    blocks: &mut Vec<AssembledBlock>,
417    stop_reason: &mut StopReason,
418    usage: &mut Usage,
419    tx: &mpsc::Sender<StreamEvent>,
420) -> bool {
421    let Some(data_line) = extract_sse_data_line(frame) else {
422        return false;
423    };
424
425    let evt: Value = match serde_json::from_str(&data_line) {
426        Ok(v) => v,
427        Err(_) => return false,
428    };
429
430    handle_sse_event(evt, blocks, stop_reason, usage, tx).await
431}
432
433async fn handle_sse_event(
434    evt: Value,
435    blocks: &mut Vec<AssembledBlock>,
436    stop_reason: &mut StopReason,
437    usage: &mut Usage,
438    tx: &mpsc::Sender<StreamEvent>,
439) -> bool {
440    match evt["type"].as_str().unwrap_or("") {
441        "message_start" => {
442            // Initial usage payload — `input_tokens` is final
443            // (they don't grow during streaming) but
444            // `output_tokens` starts near 0 and is updated by
445            // subsequent `message_delta` events.
446            *usage = merge_usage(usage.clone(), &evt["message"]["usage"]);
447            debug!(
448                "message_start: usage_json={}",
449                serde_json::to_string(&evt["message"]["usage"]).unwrap_or_default()
450            );
451            debug!(
452                "message_start parsed: input={}, output={}, cache_read={}, cache_created={}",
453                usage.input_tokens, usage.output_tokens,
454                usage.cache_read_input_tokens, usage.cache_creation_input_tokens
455            );
456        }
457        "content_block_start" => {
458            let idx = evt["index"].as_u64().unwrap_or(0) as usize;
459            let block = &evt["content_block"];
460            let kind = block["type"].as_str().unwrap_or("");
461            while blocks.len() <= idx {
462                blocks.push(AssembledBlock::default());
463            }
464            match kind {
465                "text" => {
466                    blocks[idx] = AssembledBlock::Text(String::new());
467                }
468                "thinking" => {
469                    blocks[idx] = AssembledBlock::Thinking {
470                        text: String::new(),
471                        signature: None,
472                    };
473                }
474                "tool_use" => {
475                    let id = block["id"].as_str().unwrap_or("").to_string();
476                    let name = block["name"].as_str().unwrap_or("").to_string();
477                    blocks[idx] = AssembledBlock::ToolUse {
478                        id: id.clone(),
479                        name: name.clone(),
480                        input_json: String::new(),
481                    };
482                    let _ = tx.send(StreamEvent::ToolUseStart { id, name }).await;
483                }
484                "server_tool_use" => {
485                    let id = block["id"].as_str().unwrap_or("").to_string();
486                    let name = block["name"].as_str().unwrap_or("").to_string();
487                    blocks[idx] = AssembledBlock::ServerToolUse {
488                        id: id.clone(),
489                        name: name.clone(),
490                        input_json: String::new(),
491                    };
492                    let _ = tx.send(StreamEvent::ToolUseStart { id, name }).await;
493                }
494                "web_search_tool_result" => {
495                    let tool_use_id = block["tool_use_id"].as_str().unwrap_or("").to_string();
496                    blocks[idx] = AssembledBlock::WebSearchResult {
497                        tool_use_id,
498                        content_json: String::new(),
499                    };
500                }
501                _ => {}
502            }
503        }
504        "content_block_delta" => {
505            let idx = evt["index"].as_u64().unwrap_or(0) as usize;
506            let delta = &evt["delta"];
507            let dt = delta["type"].as_str().unwrap_or("");
508            if idx >= blocks.len() {
509                return false;
510            }
511            match (dt, &mut blocks[idx]) {
512                ("text_delta", AssembledBlock::Text(buf)) => {
513                    if let Some(t) = delta["text"].as_str() {
514                        buf.push_str(t);
515                        let _ = tx.send(StreamEvent::TextDelta(t.to_string())).await;
516                    }
517                }
518                ("thinking_delta", AssembledBlock::Thinking { text, .. }) => {
519                    if let Some(t) = delta["thinking"].as_str() {
520                        text.push_str(t);
521                        let _ = tx.send(StreamEvent::ThinkingDelta(t.to_string())).await;
522                    }
523                }
524                ("signature_delta", AssembledBlock::Thinking { signature, .. }) => {
525                    if let Some(s) = delta["signature"].as_str() {
526                        signature.get_or_insert_with(String::new).push_str(s);
527                    }
528                }
529                ("input_json_delta", AssembledBlock::ToolUse { input_json, .. }) => {
530                    if let Some(p) = delta["partial_json"].as_str() {
531                        input_json.push_str(p);
532                        let _ = tx
533                            .send(StreamEvent::ToolInputDelta {
534                                bytes_so_far: input_json.len(),
535                            })
536                            .await;
537                    }
538                }
539                ("input_json_delta", AssembledBlock::ServerToolUse { input_json, .. }) => {
540                    if let Some(p) = delta["partial_json"].as_str() {
541                        input_json.push_str(p);
542                        let _ = tx
543                            .send(StreamEvent::ToolInputDelta {
544                                bytes_so_far: input_json.len(),
545                            })
546                            .await;
547                    }
548                }
549                _ => {}
550            }
551        }
552        "message_delta" => {
553            if let Some(sr) = evt["delta"]["stop_reason"].as_str() {
554                *stop_reason = match sr {
555                    "tool_use" => StopReason::ToolUse,
556                    "max_tokens" => StopReason::MaxTokens,
557                    _ => StopReason::EndTurn,
558                };
559            }
560            // `usage` on message_delta reflects cumulative
561            // counts for the current message — most notably
562            // the final `output_tokens`.
563            *usage = merge_usage(usage.clone(), &evt["usage"]);
564            debug!(
565                "message_delta: input={}, output={}, cache_read={}, cache_created={}",
566                usage.input_tokens, usage.output_tokens,
567                usage.cache_read_input_tokens, usage.cache_creation_input_tokens
568            );
569        }
570        "message_stop" => {
571            debug!(
572                "Message completed: stop_reason={}, usage={}",
573                match *stop_reason {
574                    StopReason::EndTurn => "end_turn",
575                    StopReason::ToolUse => "tool_use",
576                    StopReason::MaxTokens => "max_tokens",
577                },
578                usage.output_tokens
579            );
580            debug!(
581                "message_stop final usage: cache_read={}, cache_created={}",
582                usage.cache_read_input_tokens, usage.cache_creation_input_tokens
583            );
584            let _ = tx
585                .send(StreamEvent::Done(finalize_incomplete_stream(
586                    std::mem::take(blocks),
587                    stop_reason.clone(),
588                    usage.clone(),
589                )))
590                .await;
591            return true;
592        }
593        "error" => {
594            let msg = evt["error"]["message"]
595                .as_str()
596                .unwrap_or("unknown stream error")
597                .to_string();
598            let _ = tx.send(StreamEvent::Error(msg)).await;
599            return true;
600        }
601        _ => {}
602    }
603
604    false
605}
606
607fn build_chat_response(
608    blocks: Vec<AssembledBlock>,
609    stop_reason: StopReason,
610    usage: Usage,
611) -> ChatResponse {
612    let content: Vec<ContentBlock> = blocks.into_iter().filter_map(|b| b.finish()).collect();
613    ChatResponse {
614        content,
615        stop_reason,
616        usage,
617    }
618}
619
620fn finalize_incomplete_stream(
621    blocks: Vec<AssembledBlock>,
622    stop_reason: StopReason,
623    usage: Usage,
624) -> ChatResponse {
625    build_chat_response(blocks, stop_reason, usage)
626}
627
628#[derive(Default)]
629enum AssembledBlock {
630    #[default]
631    Empty,
632    Text(String),
633    Thinking {
634        text: String,
635        signature: Option<String>,
636    },
637    ToolUse {
638        id: String,
639        name: String,
640        input_json: String,
641    },
642    ServerToolUse {
643        id: String,
644        name: String,
645        input_json: String,
646    },
647    WebSearchResult {
648        tool_use_id: String,
649        content_json: String,
650    },
651}
652
653impl AssembledBlock {
654    fn finish(self) -> Option<ContentBlock> {
655        match self {
656            AssembledBlock::Empty => None,
657            AssembledBlock::Text(text) => Some(ContentBlock::Text { text }),
658            AssembledBlock::Thinking { text, signature } => Some(ContentBlock::Thinking {
659                thinking: text,
660                signature,
661            }),
662            AssembledBlock::ToolUse {
663                id,
664                name,
665                input_json,
666            } => {
667                let input: Value = if input_json.is_empty() {
668                    json!({})
669                } else {
670                    serde_json::from_str(&input_json).unwrap_or(json!({}))
671                };
672                Some(ContentBlock::ToolUse { id, name, input })
673            }
674            AssembledBlock::ServerToolUse {
675                id,
676                name,
677                input_json,
678            } => {
679                let input: Value = if input_json.is_empty() {
680                    json!({})
681                } else {
682                    serde_json::from_str(&input_json).unwrap_or(json!({}))
683                };
684                Some(ContentBlock::ServerToolUse { id, name, input })
685            }
686            AssembledBlock::WebSearchResult {
687                tool_use_id,
688                content_json,
689            } => {
690                let content: Value = if content_json.is_empty() {
691                    json!({"results": []})
692                } else {
693                    serde_json::from_str(&content_json).unwrap_or(json!({"results": []}))
694                };
695                Some(ContentBlock::WebSearchResult {
696                    tool_use_id,
697                    content: parse_web_search_content(&content),
698                })
699            }
700        }
701    }
702}
703
704/// Parse the provider's `usage` blob (non-streaming response) into our
705/// internal `Usage` struct. Missing fields default to 0.
706fn parse_usage(u: &Value) -> Usage {
707    Usage {
708        input_tokens: u["input_tokens"].as_u64().unwrap_or(0) as u32,
709        output_tokens: u["output_tokens"].as_u64().unwrap_or(0) as u32,
710        cache_creation_input_tokens: u["cache_creation_input_tokens"].as_u64().unwrap_or(0) as u32,
711        cache_read_input_tokens: u["cache_read_input_tokens"].as_u64().unwrap_or(0) as u32,
712    }
713}
714
715/// Merge a fresh usage payload into the accumulated one. Non-zero new values
716/// override prior ones — this matches the streaming protocol where
717/// `message_start` gives input counts and `message_delta` updates output.
718fn merge_usage(mut acc: Usage, u: &Value) -> Usage {
719    let fresh = parse_usage(u);
720    if fresh.input_tokens > 0 {
721        acc.input_tokens = fresh.input_tokens;
722    }
723    if fresh.output_tokens > 0 {
724        acc.output_tokens = fresh.output_tokens;
725    }
726    if fresh.cache_creation_input_tokens > 0 {
727        acc.cache_creation_input_tokens = fresh.cache_creation_input_tokens;
728    }
729    if fresh.cache_read_input_tokens > 0 {
730        acc.cache_read_input_tokens = fresh.cache_read_input_tokens;
731    }
732    acc
733}
734
735/// Parse web search content from the API response.
736fn parse_web_search_content(value: &serde_json::Value) -> crate::providers::WebSearchContent {
737    let results = value["results"]
738        .as_array()
739        .map(|arr| {
740            arr.iter()
741                .filter_map(|item| {
742                    Some(crate::providers::WebSearchResultItem {
743                        title: item["title"].as_str().map(String::from),
744                        url: item["url"].as_str()?.to_string(),
745                        encrypted_content: item["encrypted_content"].as_str().map(String::from),
746                        snippet: item["snippet"].as_str().map(String::from),
747                    })
748                })
749                .collect()
750        })
751        .unwrap_or_default();
752
753    crate::providers::WebSearchContent { results }
754}
755
756/// Best-effort mapping from an Anthropic-ish model name to its context
757/// window size. Honours the `CONTEXT_SIZE` env variable first so users
758/// can pin a value when running through a proxy gateway (e.g. Kimi via
759/// an Anthropic-shaped endpoint). Returns `None` only when we truly
760/// cannot infer; callers treat that as "hide the fullness bar".
761fn context_window_for(model: &str) -> Option<u32> {
762    if let Ok(raw) = std::env::var("CONTEXT_SIZE")
763        && let Ok(n) = raw.trim().parse::<u32>()
764            && n > 0 {
765                return Some(n);
766            }
767    let m = model.to_ascii_lowercase();
768    
769    // Claude Opus 4.7 and the 1M-context variants expose a 1M window.
770    if m.contains("[1m]") || m.contains("opus-4-7") || m.contains("opus-4.7") {
771        return Some(1_000_000);
772    }
773    // Claude 3.5/4 series: 200K standard window
774    if m.contains("claude-3") || m.contains("claude-4") || m.contains("claude-opus") || m.contains("claude-sonnet") {
775        return Some(200_000);
776    }
777    // Claude 2: 100K
778    if m.contains("claude-2") {
779        return Some(100_000);
780    }
781    // Claude Instant: 100K
782    if m.contains("claude-instant") {
783        return Some(100_000);
784    }
785    // Kimi K2 / K2.5 ship with a 128K window on the public endpoint.
786    if m.contains("kimi") {
787        return Some(128_000);
788    }
789    // DeepSeek via Anthropic endpoint
790    if m.contains("deepseek") {
791        return Some(128_000);
792    }
793    // GLM models (Zhipu AI) via Anthropic-compatible endpoints
794    // GLM-4 has 128K, GLM-5 typically has 128K context
795    if m.contains("glm") {
796        return Some(128_000);
797    }
798    // Qwen models via Anthropic-compatible endpoints (DashScope)
799    if m.contains("qwen") {
800        if m.contains("qwen-max") || m.contains("qwen2.5") {
801            return Some(128_000);
802        }
803        return Some(32_000);
804    }
805    // Default fallback for unknown models: assume 128K (reasonable for modern models)
806    // This ensures context usage is always displayed
807    Some(128_000)
808}
809
810#[cfg(test)]
811mod tests {
812    use super::*;
813
814    #[test]
815    fn take_next_sse_frame_supports_crlf_delimiter() {
816        let mut buffer = concat!(
817            "event: message_start\r\n",
818            "data: {\"type\":\"message_start\"}\r\n\r\n",
819            "data: {\"type\":\"message_stop\"}\r\n\r\n"
820        )
821        .to_string();
822
823        let first = take_next_sse_frame(&mut buffer).expect("first frame");
824        assert!(first.contains("message_start"));
825
826        let second = take_next_sse_frame(&mut buffer).expect("second frame");
827        assert!(second.contains("message_stop"));
828        assert!(buffer.is_empty());
829    }
830
831    #[test]
832    fn take_trailing_sse_frame_returns_unterminated_event() {
833        let mut buffer = "data: {\"type\":\"message_stop\"}\r\n".to_string();
834        let frame = take_trailing_sse_frame(&mut buffer).expect("trailing frame");
835        assert_eq!(frame, "data: {\"type\":\"message_stop\"}");
836        assert!(buffer.is_empty());
837    }
838
839    #[test]
840    fn extract_sse_data_line_supports_optional_space() {
841        assert_eq!(
842            extract_sse_data_line("event: x\r\ndata: {\"k\":1}\r"),
843            Some("{\"k\":1}".to_string())
844        );
845        assert_eq!(
846            extract_sse_data_line("event: x\r\ndata:{\"k\":2}\r"),
847            Some("{\"k\":2}".to_string())
848        );
849    }
850
851    #[test]
852    fn finalize_incomplete_stream_keeps_accumulated_content() {
853        let response = finalize_incomplete_stream(
854            vec![AssembledBlock::Text("partial reply".to_string())],
855            StopReason::EndTurn,
856            Usage::default(),
857        );
858
859        assert_eq!(response.stop_reason, StopReason::EndTurn);
860        assert_eq!(response.content.len(), 1);
861        match &response.content[0] {
862            ContentBlock::Text { text } => assert_eq!(text, "partial reply"),
863            other => panic!("unexpected block: {other:?}"),
864        }
865    }
866}
867