1use anyhow::Result;
2use async_trait::async_trait;
3use futures_util::StreamExt;
4use log::debug;
5use serde_json::{Value, json};
6use tokio::sync::mpsc;
7
8use crate::models::context_window_for;
9use crate::tools::ToolDefinition;
10
11use super::{
12 ChatRequest, ChatResponse, ContentBlock, Message, MessageContent, Provider, Role, StopReason,
13 StreamEvent, Usage,
14};
15
16pub struct AnthropicProvider {
17 api_key: String,
18 model: String,
19 base_url: String,
20 client: reqwest::Client,
21 is_dashscope: bool,
23}
24
25impl AnthropicProvider {
26 pub fn new(api_key: String, model: String, base_url: String) -> Self {
27 let is_dashscope = base_url.contains("dashscope.aliyuncs.com");
28 let client = reqwest::Client::builder()
29 .timeout(std::time::Duration::from_secs(120))
30 .connect_timeout(std::time::Duration::from_secs(10))
31 .danger_accept_invalid_certs(true)
32 .build()
33 .unwrap_or_else(|_| reqwest::Client::new());
34 Self {
35 api_key,
36 model,
37 base_url,
38 client,
39 is_dashscope,
40 }
41 }
42
43 fn convert_messages(&self, messages: &[Message]) -> Vec<Value> {
44 messages
45 .iter()
46 .filter(|m| m.role != Role::System)
47 .map(|m| {
48 let role = match m.role {
49 Role::User | Role::Tool => "user",
50 Role::Assistant => "assistant",
51 Role::System => unreachable!(),
52 };
53
54 let content = match &m.content {
55 MessageContent::Text(text) => json!(text),
56 MessageContent::Blocks(blocks) => {
57 let converted: Vec<Value> = blocks
58 .iter()
59 .map(|b| match b {
60 ContentBlock::Text { text } => json!({"type": "text", "text": text}),
61 ContentBlock::ToolUse { id, name, input } => {
62 json!({"type": "tool_use", "id": id, "name": name, "input": input})
63 }
64 ContentBlock::ToolResult { tool_use_id, content } => {
65 json!({"type": "tool_result", "tool_use_id": tool_use_id, "content": content})
66 }
67 ContentBlock::Thinking { thinking, signature } => {
68 let mut obj = json!({"type": "thinking", "thinking": thinking});
69 if let Some(sig) = signature {
70 obj["signature"] = json!(sig);
71 }
72 obj
73 }
74 ContentBlock::ServerToolUse { id, name, input } => {
75 json!({"type": "server_tool_use", "id": id, "name": name, "input": input})
76 }
77 ContentBlock::WebSearchResult { tool_use_id, content } => {
78 json!({"type": "web_search_tool_result", "tool_use_id": tool_use_id, "content": content})
79 }
80 })
81 .collect();
82 json!(converted)
83 }
84 };
85
86 json!({"role": role, "content": content})
87 })
88 .collect()
89 }
90
91 fn convert_tools_with_caching(
93 &self,
94 tools: &[ToolDefinition],
95 enable_caching: bool,
96 ) -> Vec<Value> {
97 let mut converted: Vec<Value> = tools
98 .iter()
99 .map(|t| {
100 json!({
101 "name": t.name,
102 "description": t.description,
103 "input_schema": t.parameters,
104 })
105 })
106 .collect();
107
108 if enable_caching && !converted.is_empty() {
110 let last_idx = converted.len() - 1;
111 if let Some(obj) = converted[last_idx].as_object_mut() {
112 obj.insert("cache_control".to_string(), json!({"type": "ephemeral"}));
113 }
114 }
115
116 converted
117 }
118
119 fn build_body(&self, request: &ChatRequest) -> Value {
121 let mut body = json!({
122 "model": self.model,
123 "max_tokens": request.max_tokens,
124 "messages": self.convert_messages(&request.messages),
125 });
126
127 if request.enable_caching && !self.is_dashscope {
129 if let Some(system) = &request.system {
130 body["system"] = json!([
132 {
133 "type": "text",
134 "text": system,
135 "cache_control": {"type": "ephemeral"}
136 }
137 ]);
138 }
139 } else if let Some(system) = &request.system {
140 body["system"] = json!(system);
141 }
142
143 if !request.tools.is_empty() {
144 let tools = self.convert_tools_with_caching(
145 &request.tools,
146 request.enable_caching && !self.is_dashscope,
147 );
148 body["tools"] = json!(tools);
149 }
150
151 if !request.server_tools.is_empty() {
152 body["tools"] = json!(
153 body["tools"]
154 .as_array()
155 .map(|t| {
156 let mut tools = t.clone();
157 for st in &request.server_tools {
158 tools.push(serde_json::to_value(st).unwrap_or_default());
159 }
160 tools
161 })
162 .unwrap_or_else(|| request
163 .server_tools
164 .iter()
165 .map(|st| serde_json::to_value(st).unwrap_or_default())
166 .collect())
167 );
168 }
169
170 if request.think && !self.is_dashscope {
172 let config = thinking_config(&self.model);
173 log::debug!(
174 "Adding thinking config for model {}: {:?}",
175 self.model,
176 config
177 );
178 body["thinking"] = config;
179 } else if !request.think {
180 log::debug!("Thinking disabled by request.think=false");
181 } else if self.is_dashscope {
182 log::debug!("Thinking disabled for DashScope");
183 }
184
185 body
186 }
187}
188
189fn thinking_config(model: &str) -> Value {
194 let m = model.to_lowercase();
195 let adaptive = m.contains("opus-4")
197 || m.contains("sonnet-4")
198 || m.contains("claude-4")
199 || m.contains("20250")
200 || m.contains("2025");
201 if adaptive {
202 json!({"type": "enabled", "budget_tokens": 10000})
203 } else {
204 json!({"type": "enabled", "budget_tokens": 5000})
205 }
206}
207
208#[async_trait]
209impl Provider for AnthropicProvider {
210 fn context_size(&self) -> Option<u32> {
211 context_window_for(&self.model)
212 }
213
214 fn clone_box(&self) -> Box<dyn Provider> {
215 Box::new(Self {
216 api_key: self.api_key.clone(),
217 model: self.model.clone(),
218 base_url: self.base_url.clone(),
219 client: reqwest::Client::new(),
220 is_dashscope: self.is_dashscope,
221 })
222 }
223
224 async fn chat(&self, request: ChatRequest) -> Result<ChatResponse> {
225 let body = self.build_body(&request);
226
227 let url = format!("{}/v1/messages", self.base_url);
228 let mut req = self
229 .client
230 .post(&url)
231 .header("User-Agent", "curl/8.0")
232 .json(&body);
233
234 if self.is_dashscope {
236 req = req.header("Authorization", format!("Bearer {}", self.api_key));
237 } else {
238 req = req
239 .header("x-api-key", &self.api_key)
240 .header("anthropic-version", "2025-04-15")
241 .header("anthropic-beta", "prompt-caching-2024-07-31");
242 }
243
244 let response = req.send().await?;
245
246 let status = response.status();
247 let response_body: Value = response.json().await?;
248
249 if !status.is_success() {
250 let err_msg = response_body["error"]["message"]
251 .as_str()
252 .unwrap_or("unknown error");
253 anyhow::bail!("Anthropic API error ({}): {}", status, err_msg);
254 }
255
256 let stop_reason = match response_body["stop_reason"].as_str() {
257 Some("tool_use") => StopReason::ToolUse,
258 Some("max_tokens") => StopReason::MaxTokens,
259 _ => StopReason::EndTurn,
260 };
261
262 let content = response_body["content"]
263 .as_array()
264 .unwrap_or(&vec![])
265 .iter()
266 .filter_map(|block| match block["type"].as_str()? {
267 "text" => Some(ContentBlock::Text {
268 text: block["text"].as_str()?.to_string(),
269 }),
270 "tool_use" => Some(ContentBlock::ToolUse {
271 id: block["id"].as_str()?.to_string(),
272 name: block["name"].as_str()?.to_string(),
273 input: block["input"].clone(),
274 }),
275 "thinking" => Some(ContentBlock::Thinking {
276 thinking: block["thinking"].as_str()?.to_string(),
277 signature: block["signature"].as_str().map(String::from),
278 }),
279 "server_tool_use" => Some(ContentBlock::ServerToolUse {
280 id: block["id"].as_str()?.to_string(),
281 name: block["name"].as_str()?.to_string(),
282 input: block["input"].clone(),
283 }),
284 "web_search_tool_result" => {
285 let tool_use_id = block["tool_use_id"].as_str()?.to_string();
286 let content = parse_web_search_content(&block["content"]);
287 Some(ContentBlock::WebSearchResult {
288 tool_use_id,
289 content,
290 })
291 }
292 _ => None,
293 })
294 .collect();
295
296 Ok(ChatResponse {
297 content,
298 stop_reason,
299 usage: parse_usage(&response_body["usage"]),
300 })
301 }
302
303 async fn chat_stream(&self, request: ChatRequest) -> Result<mpsc::Receiver<StreamEvent>> {
304 let mut body = self.build_body(&request);
305 body["stream"] = json!(true);
306
307 let url = format!("{}/v1/messages", self.base_url);
308 let mut req = self
309 .client
310 .post(&url)
311 .header("User-Agent", "curl/8.0")
312 .json(&body);
313
314 if self.is_dashscope {
316 req = req
317 .header("Authorization", format!("Bearer {}", self.api_key))
318 .header("X-DashScope-SSE", "enable");
319 } else {
320 req = req
321 .header("x-api-key", &self.api_key)
322 .header("anthropic-version", "2025-04-15")
323 .header("anthropic-beta", "prompt-caching-2024-07-31");
324 }
325
326 let response = req.send().await?;
327
328 if !response.status().is_success() {
329 let status = response.status();
330 let text = response.text().await.unwrap_or_default();
331 anyhow::bail!("Anthropic API error ({}): {}", status, text);
332 }
333
334 let (tx, rx) = mpsc::channel(64);
335 tokio::spawn(async move {
336 let mut stream = response.bytes_stream();
337 let mut buffer = String::new();
338 let mut sent_first_byte = false;
339
340 let mut blocks: Vec<AssembledBlock> = Vec::new();
342 let mut stop_reason = StopReason::EndTurn;
343 let mut usage = Usage::default();
344
345 while let Some(chunk) = stream.next().await {
346 let chunk = match chunk {
347 Ok(c) => c,
348 Err(e) => {
349 let _ = tx
350 .send(StreamEvent::Error(format!("stream read error: {}", e)))
351 .await;
352 return;
353 }
354 };
355
356 if !sent_first_byte {
357 sent_first_byte = true;
358 let _ = tx.send(StreamEvent::FirstByte).await;
359 }
360
361 buffer.push_str(&String::from_utf8_lossy(&chunk));
362
363 while let Some(frame) = take_next_sse_frame(&mut buffer) {
364 if handle_sse_frame(&frame, &mut blocks, &mut stop_reason, &mut usage, &tx)
365 .await
366 {
367 return;
368 }
369 }
370 }
371
372 if let Some(frame) = take_trailing_sse_frame(&mut buffer)
373 && handle_sse_frame(&frame, &mut blocks, &mut stop_reason, &mut usage, &tx).await
374 {
375 return;
376 }
377
378 if sent_first_byte {
379 debug!("stream ended without explicit message_stop; finalizing best-effort");
380 let _ = tx
381 .send(StreamEvent::Done(finalize_incomplete_stream(
382 std::mem::take(&mut blocks),
383 stop_reason,
384 usage,
385 )))
386 .await;
387 } else {
388 let _ = tx
389 .send(StreamEvent::Error(
390 "stream ended before any events were received".to_string(),
391 ))
392 .await;
393 }
394 });
395
396 Ok(rx)
397 }
398}
399
400fn take_next_sse_frame(buffer: &mut String) -> Option<String> {
401 let lf = buffer.find("\n\n").map(|pos| (pos, 2usize));
402 let crlf = buffer.find("\r\n\r\n").map(|pos| (pos, 4usize));
403 let (pos, delim_len) = match (lf, crlf) {
404 (Some(a), Some(b)) => {
405 if a.0 <= b.0 {
406 a
407 } else {
408 b
409 }
410 }
411 (Some(a), None) => a,
412 (None, Some(b)) => b,
413 (None, None) => return None,
414 };
415
416 let frame = buffer[..pos].to_string();
417 buffer.drain(..pos + delim_len);
418 Some(frame)
419}
420
421fn take_trailing_sse_frame(buffer: &mut String) -> Option<String> {
422 let frame = buffer.trim().trim_end_matches('\r').to_string();
423 buffer.clear();
424 if frame.is_empty() { None } else { Some(frame) }
425}
426
427fn extract_sse_data_line(frame: &str) -> Option<String> {
428 for line in frame.lines() {
429 let line = line.trim_end_matches('\r');
430 if let Some(rest) = line.strip_prefix("data: ") {
432 return Some(rest.to_string());
433 }
434 if let Some(rest) = line.strip_prefix("data:") {
435 return Some(rest.to_string());
436 }
437 }
438 None
439}
440
441async fn handle_sse_frame(
442 frame: &str,
443 blocks: &mut Vec<AssembledBlock>,
444 stop_reason: &mut StopReason,
445 usage: &mut Usage,
446 tx: &mpsc::Sender<StreamEvent>,
447) -> bool {
448 let Some(data_line) = extract_sse_data_line(frame) else {
449 return false;
450 };
451
452 let evt: Value = match serde_json::from_str(&data_line) {
453 Ok(v) => v,
454 Err(_) => return false,
455 };
456
457 handle_sse_event(evt, blocks, stop_reason, usage, tx).await
458}
459
460async fn handle_sse_event(
461 evt: Value,
462 blocks: &mut Vec<AssembledBlock>,
463 stop_reason: &mut StopReason,
464 usage: &mut Usage,
465 tx: &mpsc::Sender<StreamEvent>,
466) -> bool {
467 match evt["type"].as_str().unwrap_or("") {
468 "message_start" => {
469 *usage = merge_usage(usage.clone(), &evt["message"]["usage"]);
474 debug!(
475 "message_start: usage_json={}",
476 serde_json::to_string(&evt["message"]["usage"]).unwrap_or_default()
477 );
478 debug!(
479 "message_start parsed: input={}, output={}, cache_read={}, cache_created={}",
480 usage.input_tokens,
481 usage.output_tokens,
482 usage.cache_read_input_tokens,
483 usage.cache_creation_input_tokens
484 );
485 let _ = tx
487 .send(StreamEvent::Usage {
488 output_tokens: usage.output_tokens,
489 })
490 .await;
491 }
492 "content_block_start" => {
493 let idx = evt["index"].as_u64().unwrap_or(0) as usize;
494 let block = &evt["content_block"];
495 let kind = block["type"].as_str().unwrap_or("");
496 while blocks.len() <= idx {
497 blocks.push(AssembledBlock::default());
498 }
499 match kind {
500 "text" => {
501 blocks[idx] = AssembledBlock::Text(String::new());
502 }
503 "thinking" => {
504 blocks[idx] = AssembledBlock::Thinking {
505 text: String::new(),
506 signature: None,
507 };
508 }
509 "tool_use" | "server_tool_use" => {
510 let id = block["id"].as_str().unwrap_or_default();
511 let name = block["name"].as_str().unwrap_or_default();
512 let is_server = kind == "server_tool_use";
513 blocks[idx] = if is_server {
514 AssembledBlock::ServerToolUse {
515 id: id.into(),
516 name: name.into(),
517 input_json: String::new(),
518 }
519 } else {
520 AssembledBlock::ToolUse {
521 id: id.into(),
522 name: name.into(),
523 input_json: String::new(),
524 }
525 };
526 let _ = tx
527 .send(StreamEvent::ToolUseStart {
528 id: id.into(),
529 name: name.into(),
530 })
531 .await;
532 }
533 "web_search_tool_result" => {
534 let tool_use_id = block["tool_use_id"].as_str().unwrap_or("").to_string();
535 blocks[idx] = AssembledBlock::WebSearchResult {
536 tool_use_id,
537 content_json: String::new(),
538 };
539 }
540 _ => {}
541 }
542 }
543 "content_block_delta" => {
544 let idx = evt["index"].as_u64().unwrap_or(0) as usize;
545 let delta = &evt["delta"];
546 let dt = delta["type"].as_str().unwrap_or("");
547 if idx >= blocks.len() {
548 return false;
549 }
550 match (dt, &mut blocks[idx]) {
551 ("text_delta", AssembledBlock::Text(buf)) => {
552 if let Some(t) = delta["text"].as_str() {
553 buf.push_str(t);
554 let _ = tx.send(StreamEvent::TextDelta(t.to_string())).await;
555 }
556 }
557 ("thinking_delta", AssembledBlock::Thinking { text, .. }) => {
558 if let Some(t) = delta["thinking"].as_str() {
559 text.push_str(t);
560 log::debug!("Received thinking_delta: {} chars", t.len());
561 let _ = tx.send(StreamEvent::ThinkingDelta(t.to_string())).await;
562 }
563 }
564 ("signature_delta", AssembledBlock::Thinking { signature, .. }) => {
565 if let Some(s) = delta["signature"].as_str() {
566 signature.get_or_insert_with(String::new).push_str(s);
567 }
568 }
569 ("input_json_delta", AssembledBlock::ToolUse { input_json, .. }) => {
570 if let Some(p) = delta["partial_json"].as_str() {
571 input_json.push_str(p);
572 let _ = tx
573 .send(StreamEvent::ToolInputDelta {
574 bytes_so_far: input_json.len(),
575 })
576 .await;
577 }
578 }
579 ("input_json_delta", AssembledBlock::ServerToolUse { input_json, .. }) => {
580 if let Some(p) = delta["partial_json"].as_str() {
581 input_json.push_str(p);
582 let _ = tx
583 .send(StreamEvent::ToolInputDelta {
584 bytes_so_far: input_json.len(),
585 })
586 .await;
587 }
588 }
589 _ => {}
590 }
591 }
592 "message_delta" => {
593 if let Some(sr) = evt["delta"]["stop_reason"].as_str() {
594 *stop_reason = match sr {
595 "tool_use" => StopReason::ToolUse,
596 "max_tokens" => StopReason::MaxTokens,
597 _ => StopReason::EndTurn,
598 };
599 }
600 *usage = merge_usage(usage.clone(), &evt["usage"]);
604 debug!(
605 "message_delta: input={}, output={}, cache_read={}, cache_created={}",
606 usage.input_tokens,
607 usage.output_tokens,
608 usage.cache_read_input_tokens,
609 usage.cache_creation_input_tokens
610 );
611 let _ = tx
613 .send(StreamEvent::Usage {
614 output_tokens: usage.output_tokens,
615 })
616 .await;
617 }
618 "message_stop" => {
619 debug!(
620 "Message completed: stop_reason={}, usage={}",
621 match *stop_reason {
622 StopReason::EndTurn => "end_turn",
623 StopReason::ToolUse => "tool_use",
624 StopReason::MaxTokens => "max_tokens",
625 },
626 usage.output_tokens
627 );
628 debug!(
629 "message_stop final usage: cache_read={}, cache_created={}",
630 usage.cache_read_input_tokens, usage.cache_creation_input_tokens
631 );
632 let _ = tx
633 .send(StreamEvent::Done(finalize_incomplete_stream(
634 std::mem::take(blocks),
635 stop_reason.clone(),
636 usage.clone(),
637 )))
638 .await;
639 return true;
640 }
641 "error" => {
642 let msg = evt["error"]["message"]
643 .as_str()
644 .unwrap_or("unknown stream error")
645 .to_string();
646 let _ = tx.send(StreamEvent::Error(msg)).await;
647 return true;
648 }
649 _ => {}
650 }
651
652 false
653}
654
655fn build_chat_response(
656 blocks: Vec<AssembledBlock>,
657 stop_reason: StopReason,
658 usage: Usage,
659) -> ChatResponse {
660 let content: Vec<ContentBlock> = blocks.into_iter().filter_map(|b| b.finish()).collect();
661 ChatResponse {
662 content,
663 stop_reason,
664 usage,
665 }
666}
667
668fn finalize_incomplete_stream(
669 blocks: Vec<AssembledBlock>,
670 stop_reason: StopReason,
671 usage: Usage,
672) -> ChatResponse {
673 build_chat_response(blocks, stop_reason, usage)
674}
675
676#[derive(Default)]
677enum AssembledBlock {
678 #[default]
679 Empty,
680 Text(String),
681 Thinking {
682 text: String,
683 signature: Option<String>,
684 },
685 ToolUse {
686 id: String,
687 name: String,
688 input_json: String,
689 },
690 ServerToolUse {
691 id: String,
692 name: String,
693 input_json: String,
694 },
695 WebSearchResult {
696 tool_use_id: String,
697 content_json: String,
698 },
699}
700
701impl AssembledBlock {
702 fn finish(self) -> Option<ContentBlock> {
703 match self {
704 AssembledBlock::Empty => None,
705 AssembledBlock::Text(text) => Some(ContentBlock::Text { text }),
706 AssembledBlock::Thinking { text, signature } => Some(ContentBlock::Thinking {
707 thinking: text,
708 signature,
709 }),
710 AssembledBlock::ToolUse {
711 id,
712 name,
713 input_json,
714 } => {
715 let input: Value = if input_json.is_empty() {
716 json!({})
717 } else {
718 serde_json::from_str(&input_json).unwrap_or(json!({}))
719 };
720 Some(ContentBlock::ToolUse { id, name, input })
721 }
722 AssembledBlock::ServerToolUse {
723 id,
724 name,
725 input_json,
726 } => {
727 let input: Value = if input_json.is_empty() {
728 json!({})
729 } else {
730 serde_json::from_str(&input_json).unwrap_or(json!({}))
731 };
732 Some(ContentBlock::ServerToolUse { id, name, input })
733 }
734 AssembledBlock::WebSearchResult {
735 tool_use_id,
736 content_json,
737 } => {
738 let content: Value = if content_json.is_empty() {
739 json!({"results": []})
740 } else {
741 serde_json::from_str(&content_json).unwrap_or(json!({"results": []}))
742 };
743 Some(ContentBlock::WebSearchResult {
744 tool_use_id,
745 content: parse_web_search_content(&content),
746 })
747 }
748 }
749 }
750}
751
752fn parse_usage(u: &Value) -> Usage {
755 Usage {
756 input_tokens: u["input_tokens"].as_u64().unwrap_or(0) as u32,
757 output_tokens: u["output_tokens"].as_u64().unwrap_or(0) as u32,
758 cache_creation_input_tokens: u["cache_creation_input_tokens"].as_u64().unwrap_or(0) as u32,
759 cache_read_input_tokens: u["cache_read_input_tokens"].as_u64().unwrap_or(0) as u32,
760 }
761}
762
763fn merge_usage(mut acc: Usage, u: &Value) -> Usage {
767 let fresh = parse_usage(u);
768 if fresh.input_tokens > 0 {
769 acc.input_tokens = fresh.input_tokens;
770 }
771 if fresh.output_tokens > 0 {
772 acc.output_tokens = fresh.output_tokens;
773 }
774 if fresh.cache_creation_input_tokens > 0 {
775 acc.cache_creation_input_tokens = fresh.cache_creation_input_tokens;
776 }
777 if fresh.cache_read_input_tokens > 0 {
778 acc.cache_read_input_tokens = fresh.cache_read_input_tokens;
779 }
780 acc
781}
782
783fn parse_web_search_content(value: &serde_json::Value) -> crate::providers::WebSearchContent {
785 let results = value["results"]
786 .as_array()
787 .map(|arr| {
788 arr.iter()
789 .filter_map(|item| {
790 Some(crate::providers::WebSearchResultItem {
791 title: item["title"].as_str().map(String::from),
792 url: item["url"].as_str()?.to_string(),
793 encrypted_content: item["encrypted_content"].as_str().map(String::from),
794 snippet: item["snippet"].as_str().map(String::from),
795 })
796 })
797 .collect()
798 })
799 .unwrap_or_default();
800
801 crate::providers::WebSearchContent { results }
802}
803
804#[cfg(test)]
805mod tests {
806 use super::*;
807
808 #[test]
809 fn take_next_sse_frame_supports_crlf_delimiter() {
810 let mut buffer = concat!(
811 "event: message_start\r\n",
812 "data: {\"type\":\"message_start\"}\r\n\r\n",
813 "data: {\"type\":\"message_stop\"}\r\n\r\n"
814 )
815 .to_string();
816
817 let first = take_next_sse_frame(&mut buffer).expect("first frame");
818 assert!(first.contains("message_start"));
819
820 let second = take_next_sse_frame(&mut buffer).expect("second frame");
821 assert!(second.contains("message_stop"));
822 assert!(buffer.is_empty());
823 }
824
825 #[test]
826 fn take_trailing_sse_frame_returns_unterminated_event() {
827 let mut buffer = "data: {\"type\":\"message_stop\"}\r\n".to_string();
828 let frame = take_trailing_sse_frame(&mut buffer).expect("trailing frame");
829 assert_eq!(frame, "data: {\"type\":\"message_stop\"}");
830 assert!(buffer.is_empty());
831 }
832
833 #[test]
834 fn extract_sse_data_line_supports_optional_space() {
835 assert_eq!(
836 extract_sse_data_line("event: x\r\ndata: {\"k\":1}\r"),
837 Some("{\"k\":1}".to_string())
838 );
839 assert_eq!(
840 extract_sse_data_line("event: x\r\ndata:{\"k\":2}\r"),
841 Some("{\"k\":2}".to_string())
842 );
843 }
844
845 #[test]
846 fn finalize_incomplete_stream_keeps_accumulated_content() {
847 let response = finalize_incomplete_stream(
848 vec![AssembledBlock::Text("partial reply".to_string())],
849 StopReason::EndTurn,
850 Usage::default(),
851 );
852
853 assert_eq!(response.stop_reason, StopReason::EndTurn);
854 assert_eq!(response.content.len(), 1);
855 match &response.content[0] {
856 ContentBlock::Text { text } => assert_eq!(text, "partial reply"),
857 other => panic!("unexpected block: {other:?}"),
858 }
859 }
860}