1use anyhow::Result;
2use async_trait::async_trait;
3use futures_util::StreamExt;
4use log::debug;
5use serde_json::{Value, json};
6use tokio::sync::mpsc;
7
8use crate::tools::ToolDefinition;
9
10use super::{
11 ChatRequest, ChatResponse, ContentBlock, Message, MessageContent, Provider, Role, StopReason,
12 StreamEvent, Usage,
13};
14
15pub struct AnthropicProvider {
16 api_key: String,
17 model: String,
18 base_url: String,
19 client: reqwest::Client,
20 is_dashscope: bool,
22}
23
24impl AnthropicProvider {
25 pub fn new(api_key: String, model: String, base_url: String) -> Self {
26 let is_dashscope = base_url.contains("dashscope.aliyuncs.com");
27 Self {
28 api_key,
29 model,
30 base_url,
31 client: reqwest::Client::new(),
32 is_dashscope,
33 }
34 }
35
36 fn convert_messages(&self, messages: &[Message]) -> Vec<Value> {
37 messages
38 .iter()
39 .filter(|m| m.role != Role::System)
40 .map(|m| {
41 let role = match m.role {
42 Role::User | Role::Tool => "user",
43 Role::Assistant => "assistant",
44 Role::System => unreachable!(),
45 };
46
47 let content = match &m.content {
48 MessageContent::Text(text) => json!(text),
49 MessageContent::Blocks(blocks) => {
50 let converted: Vec<Value> = blocks
51 .iter()
52 .map(|b| match b {
53 ContentBlock::Text { text } => json!({"type": "text", "text": text}),
54 ContentBlock::ToolUse { id, name, input } => {
55 json!({"type": "tool_use", "id": id, "name": name, "input": input})
56 }
57 ContentBlock::ToolResult { tool_use_id, content } => {
58 json!({"type": "tool_result", "tool_use_id": tool_use_id, "content": content})
59 }
60 ContentBlock::Thinking { thinking, signature } => {
61 let mut obj = json!({"type": "thinking", "thinking": thinking});
62 if let Some(sig) = signature {
63 obj["signature"] = json!(sig);
64 }
65 obj
66 }
67 ContentBlock::ServerToolUse { id, name, input } => {
68 json!({"type": "server_tool_use", "id": id, "name": name, "input": input})
69 }
70 ContentBlock::WebSearchResult { tool_use_id, content } => {
71 json!({"type": "web_search_tool_result", "tool_use_id": tool_use_id, "content": content})
72 }
73 })
74 .collect();
75 json!(converted)
76 }
77 };
78
79 json!({"role": role, "content": content})
80 })
81 .collect()
82 }
83
84 fn convert_tools_with_caching(&self, tools: &[ToolDefinition], enable_caching: bool) -> Vec<Value> {
86 let mut converted: Vec<Value> = tools
87 .iter()
88 .map(|t| {
89 json!({
90 "name": t.name,
91 "description": t.description,
92 "input_schema": t.parameters,
93 })
94 })
95 .collect();
96
97 if enable_caching && !converted.is_empty() {
99 let last_idx = converted.len() - 1;
100 if let Some(obj) = converted[last_idx].as_object_mut() {
101 obj.insert("cache_control".to_string(), json!({"type": "ephemeral"}));
102 }
103 }
104
105 converted
106 }
107
108 fn build_body(&self, request: &ChatRequest) -> Value {
110 let mut body = json!({
111 "model": self.model,
112 "max_tokens": request.max_tokens,
113 "messages": self.convert_messages(&request.messages),
114 });
115
116 if request.enable_caching && !self.is_dashscope {
118 if let Some(system) = &request.system {
119 body["system"] = json!([
121 {
122 "type": "text",
123 "text": system,
124 "cache_control": {"type": "ephemeral"}
125 }
126 ]);
127 }
128 } else if let Some(system) = &request.system {
129 body["system"] = json!(system);
130 }
131
132 if !request.tools.is_empty() {
133 let tools = self.convert_tools_with_caching(&request.tools, request.enable_caching && !self.is_dashscope);
134 body["tools"] = json!(tools);
135 }
136
137 if !request.server_tools.is_empty() {
138 body["tools"] = json!(body["tools"]
139 .as_array()
140 .map(|t| {
141 let mut tools = t.clone();
142 for st in &request.server_tools {
143 tools.push(serde_json::to_value(st).unwrap_or_default());
144 }
145 tools
146 })
147 .unwrap_or_else(|| request.server_tools.iter().map(|st| serde_json::to_value(st).unwrap_or_default()).collect()));
148 }
149
150 if request.think && !self.is_dashscope {
152 body["thinking"] = thinking_config(&self.model);
153 }
154
155 body
156 }
157}
158
159fn thinking_config(model: &str) -> Value {
164 let adaptive = model.contains("opus-4-7") || model.contains("opus-4.7");
165 if adaptive {
166 json!({"type": "adaptive"})
167 } else {
168 json!({"type": "enabled", "budget_tokens": 2048})
169 }
170}
171
172#[async_trait]
173impl Provider for AnthropicProvider {
174 fn context_size(&self) -> Option<u32> {
175 context_window_for(&self.model)
176 }
177
178 fn clone_box(&self) -> Box<dyn Provider> {
179 Box::new(Self {
180 api_key: self.api_key.clone(),
181 model: self.model.clone(),
182 base_url: self.base_url.clone(),
183 client: reqwest::Client::new(),
184 is_dashscope: self.is_dashscope,
185 })
186 }
187
188 async fn chat(&self, request: ChatRequest) -> Result<ChatResponse> {
189 let body = self.build_body(&request);
190
191 let url = format!("{}/v1/messages", self.base_url);
192 let mut req = self
193 .client
194 .post(&url)
195 .header("User-Agent", "curl/8.0")
196 .json(&body);
197
198 if self.is_dashscope {
200 req = req.header("Authorization", format!("Bearer {}", self.api_key));
201 } else {
202 req = req.header("x-api-key", &self.api_key)
203 .header("anthropic-version", "2023-06-01")
204 .header("anthropic-beta", "prompt-caching-2024-07-31");
205 }
206
207 let response = req.send().await?;
208
209 let status = response.status();
210 let response_body: Value = response.json().await?;
211
212 if !status.is_success() {
213 let err_msg = response_body["error"]["message"]
214 .as_str()
215 .unwrap_or("unknown error");
216 anyhow::bail!("Anthropic API error ({}): {}", status, err_msg);
217 }
218
219 let stop_reason = match response_body["stop_reason"].as_str() {
220 Some("tool_use") => StopReason::ToolUse,
221 Some("max_tokens") => StopReason::MaxTokens,
222 _ => StopReason::EndTurn,
223 };
224
225 let content = response_body["content"]
226 .as_array()
227 .unwrap_or(&vec![])
228 .iter()
229 .filter_map(|block| match block["type"].as_str()? {
230 "text" => Some(ContentBlock::Text {
231 text: block["text"].as_str()?.to_string(),
232 }),
233 "tool_use" => Some(ContentBlock::ToolUse {
234 id: block["id"].as_str()?.to_string(),
235 name: block["name"].as_str()?.to_string(),
236 input: block["input"].clone(),
237 }),
238 "thinking" => Some(ContentBlock::Thinking {
239 thinking: block["thinking"].as_str()?.to_string(),
240 signature: block["signature"].as_str().map(String::from),
241 }),
242 "server_tool_use" => Some(ContentBlock::ServerToolUse {
243 id: block["id"].as_str()?.to_string(),
244 name: block["name"].as_str()?.to_string(),
245 input: block["input"].clone(),
246 }),
247 "web_search_tool_result" => {
248 let tool_use_id = block["tool_use_id"].as_str()?.to_string();
249 let content = parse_web_search_content(&block["content"]);
250 Some(ContentBlock::WebSearchResult {
251 tool_use_id,
252 content,
253 })
254 }
255 _ => None,
256 })
257 .collect();
258
259 Ok(ChatResponse {
260 content,
261 stop_reason,
262 usage: parse_usage(&response_body["usage"]),
263 })
264 }
265
266 async fn chat_stream(&self, request: ChatRequest) -> Result<mpsc::Receiver<StreamEvent>> {
267 let mut body = self.build_body(&request);
268 body["stream"] = json!(true);
269
270 let url = format!("{}/v1/messages", self.base_url);
271 let mut req = self
272 .client
273 .post(&url)
274 .header("User-Agent", "curl/8.0")
275 .json(&body);
276
277 if self.is_dashscope {
279 req = req
280 .header("Authorization", format!("Bearer {}", self.api_key))
281 .header("X-DashScope-SSE", "enable");
282 } else {
283 req = req.header("x-api-key", &self.api_key)
284 .header("anthropic-version", "2023-06-01")
285 .header("anthropic-beta", "prompt-caching-2024-07-31");
286 }
287
288 let response = req.send().await?;
289
290 if !response.status().is_success() {
291 let status = response.status();
292 let text = response.text().await.unwrap_or_default();
293 anyhow::bail!("Anthropic API error ({}): {}", status, text);
294 }
295
296 let (tx, rx) = mpsc::channel(64);
297 tokio::spawn(async move {
298 let mut stream = response.bytes_stream();
299 let mut buffer = String::new();
300 let mut sent_first_byte = false;
301
302 let mut blocks: Vec<AssembledBlock> = Vec::new();
304 let mut stop_reason = StopReason::EndTurn;
305 let mut usage = Usage::default();
306
307 while let Some(chunk) = stream.next().await {
308 let chunk = match chunk {
309 Ok(c) => c,
310 Err(e) => {
311 let _ = tx
312 .send(StreamEvent::Error(format!("stream read error: {}", e)))
313 .await;
314 return;
315 }
316 };
317
318 if !sent_first_byte {
319 sent_first_byte = true;
320 let _ = tx.send(StreamEvent::FirstByte).await;
321 }
322
323 buffer.push_str(&String::from_utf8_lossy(&chunk));
324
325 while let Some(frame) = take_next_sse_frame(&mut buffer) {
326 if handle_sse_frame(
327 &frame,
328 &mut blocks,
329 &mut stop_reason,
330 &mut usage,
331 &tx,
332 )
333 .await
334 {
335 return;
336 }
337 }
338 }
339
340 if let Some(frame) = take_trailing_sse_frame(&mut buffer)
341 && handle_sse_frame(&frame, &mut blocks, &mut stop_reason, &mut usage, &tx).await {
342 return;
343 }
344
345 if sent_first_byte {
346 debug!(
347 "stream ended without explicit message_stop; finalizing best-effort"
348 );
349 let _ = tx
350 .send(StreamEvent::Done(finalize_incomplete_stream(
351 std::mem::take(&mut blocks),
352 stop_reason,
353 usage,
354 )))
355 .await;
356 } else {
357 let _ = tx
358 .send(StreamEvent::Error(
359 "stream ended before any events were received".to_string(),
360 ))
361 .await;
362 }
363 });
364
365 Ok(rx)
366 }
367}
368
369fn take_next_sse_frame(buffer: &mut String) -> Option<String> {
370 let lf = buffer.find("\n\n").map(|pos| (pos, 2usize));
371 let crlf = buffer.find("\r\n\r\n").map(|pos| (pos, 4usize));
372 let (pos, delim_len) = match (lf, crlf) {
373 (Some(a), Some(b)) => {
374 if a.0 <= b.0 {
375 a
376 } else {
377 b
378 }
379 }
380 (Some(a), None) => a,
381 (None, Some(b)) => b,
382 (None, None) => return None,
383 };
384
385 let frame = buffer[..pos].to_string();
386 buffer.drain(..pos + delim_len);
387 Some(frame)
388}
389
390fn take_trailing_sse_frame(buffer: &mut String) -> Option<String> {
391 let frame = buffer.trim().trim_end_matches('\r').to_string();
392 buffer.clear();
393 if frame.is_empty() {
394 None
395 } else {
396 Some(frame)
397 }
398}
399
400fn extract_sse_data_line(frame: &str) -> Option<String> {
401 for line in frame.lines() {
402 let line = line.trim_end_matches('\r');
403 if let Some(rest) = line.strip_prefix("data: ") {
405 return Some(rest.to_string());
406 }
407 if let Some(rest) = line.strip_prefix("data:") {
408 return Some(rest.to_string());
409 }
410 }
411 None
412}
413
414async fn handle_sse_frame(
415 frame: &str,
416 blocks: &mut Vec<AssembledBlock>,
417 stop_reason: &mut StopReason,
418 usage: &mut Usage,
419 tx: &mpsc::Sender<StreamEvent>,
420) -> bool {
421 let Some(data_line) = extract_sse_data_line(frame) else {
422 return false;
423 };
424
425 let evt: Value = match serde_json::from_str(&data_line) {
426 Ok(v) => v,
427 Err(_) => return false,
428 };
429
430 handle_sse_event(evt, blocks, stop_reason, usage, tx).await
431}
432
433async fn handle_sse_event(
434 evt: Value,
435 blocks: &mut Vec<AssembledBlock>,
436 stop_reason: &mut StopReason,
437 usage: &mut Usage,
438 tx: &mpsc::Sender<StreamEvent>,
439) -> bool {
440 match evt["type"].as_str().unwrap_or("") {
441 "message_start" => {
442 *usage = merge_usage(usage.clone(), &evt["message"]["usage"]);
447 debug!(
448 "message_start: usage_json={}",
449 serde_json::to_string(&evt["message"]["usage"]).unwrap_or_default()
450 );
451 debug!(
452 "message_start parsed: input={}, output={}, cache_read={}, cache_created={}",
453 usage.input_tokens, usage.output_tokens,
454 usage.cache_read_input_tokens, usage.cache_creation_input_tokens
455 );
456 }
457 "content_block_start" => {
458 let idx = evt["index"].as_u64().unwrap_or(0) as usize;
459 let block = &evt["content_block"];
460 let kind = block["type"].as_str().unwrap_or("");
461 while blocks.len() <= idx {
462 blocks.push(AssembledBlock::default());
463 }
464 match kind {
465 "text" => {
466 blocks[idx] = AssembledBlock::Text(String::new());
467 }
468 "thinking" => {
469 blocks[idx] = AssembledBlock::Thinking {
470 text: String::new(),
471 signature: None,
472 };
473 }
474 "tool_use" => {
475 let id = block["id"].as_str().unwrap_or("").to_string();
476 let name = block["name"].as_str().unwrap_or("").to_string();
477 blocks[idx] = AssembledBlock::ToolUse {
478 id: id.clone(),
479 name: name.clone(),
480 input_json: String::new(),
481 };
482 let _ = tx.send(StreamEvent::ToolUseStart { id, name }).await;
483 }
484 "server_tool_use" => {
485 let id = block["id"].as_str().unwrap_or("").to_string();
486 let name = block["name"].as_str().unwrap_or("").to_string();
487 blocks[idx] = AssembledBlock::ServerToolUse {
488 id: id.clone(),
489 name: name.clone(),
490 input_json: String::new(),
491 };
492 let _ = tx.send(StreamEvent::ToolUseStart { id, name }).await;
493 }
494 "web_search_tool_result" => {
495 let tool_use_id = block["tool_use_id"].as_str().unwrap_or("").to_string();
496 blocks[idx] = AssembledBlock::WebSearchResult {
497 tool_use_id,
498 content_json: String::new(),
499 };
500 }
501 _ => {}
502 }
503 }
504 "content_block_delta" => {
505 let idx = evt["index"].as_u64().unwrap_or(0) as usize;
506 let delta = &evt["delta"];
507 let dt = delta["type"].as_str().unwrap_or("");
508 if idx >= blocks.len() {
509 return false;
510 }
511 match (dt, &mut blocks[idx]) {
512 ("text_delta", AssembledBlock::Text(buf)) => {
513 if let Some(t) = delta["text"].as_str() {
514 buf.push_str(t);
515 let _ = tx.send(StreamEvent::TextDelta(t.to_string())).await;
516 }
517 }
518 ("thinking_delta", AssembledBlock::Thinking { text, .. }) => {
519 if let Some(t) = delta["thinking"].as_str() {
520 text.push_str(t);
521 let _ = tx.send(StreamEvent::ThinkingDelta(t.to_string())).await;
522 }
523 }
524 ("signature_delta", AssembledBlock::Thinking { signature, .. }) => {
525 if let Some(s) = delta["signature"].as_str() {
526 signature.get_or_insert_with(String::new).push_str(s);
527 }
528 }
529 ("input_json_delta", AssembledBlock::ToolUse { input_json, .. }) => {
530 if let Some(p) = delta["partial_json"].as_str() {
531 input_json.push_str(p);
532 let _ = tx
533 .send(StreamEvent::ToolInputDelta {
534 bytes_so_far: input_json.len(),
535 })
536 .await;
537 }
538 }
539 ("input_json_delta", AssembledBlock::ServerToolUse { input_json, .. }) => {
540 if let Some(p) = delta["partial_json"].as_str() {
541 input_json.push_str(p);
542 let _ = tx
543 .send(StreamEvent::ToolInputDelta {
544 bytes_so_far: input_json.len(),
545 })
546 .await;
547 }
548 }
549 _ => {}
550 }
551 }
552 "message_delta" => {
553 if let Some(sr) = evt["delta"]["stop_reason"].as_str() {
554 *stop_reason = match sr {
555 "tool_use" => StopReason::ToolUse,
556 "max_tokens" => StopReason::MaxTokens,
557 _ => StopReason::EndTurn,
558 };
559 }
560 *usage = merge_usage(usage.clone(), &evt["usage"]);
564 debug!(
565 "message_delta: input={}, output={}, cache_read={}, cache_created={}",
566 usage.input_tokens, usage.output_tokens,
567 usage.cache_read_input_tokens, usage.cache_creation_input_tokens
568 );
569 }
570 "message_stop" => {
571 debug!(
572 "Message completed: stop_reason={}, usage={}",
573 match *stop_reason {
574 StopReason::EndTurn => "end_turn",
575 StopReason::ToolUse => "tool_use",
576 StopReason::MaxTokens => "max_tokens",
577 },
578 usage.output_tokens
579 );
580 debug!(
581 "message_stop final usage: cache_read={}, cache_created={}",
582 usage.cache_read_input_tokens, usage.cache_creation_input_tokens
583 );
584 let _ = tx
585 .send(StreamEvent::Done(finalize_incomplete_stream(
586 std::mem::take(blocks),
587 stop_reason.clone(),
588 usage.clone(),
589 )))
590 .await;
591 return true;
592 }
593 "error" => {
594 let msg = evt["error"]["message"]
595 .as_str()
596 .unwrap_or("unknown stream error")
597 .to_string();
598 let _ = tx.send(StreamEvent::Error(msg)).await;
599 return true;
600 }
601 _ => {}
602 }
603
604 false
605}
606
607fn build_chat_response(
608 blocks: Vec<AssembledBlock>,
609 stop_reason: StopReason,
610 usage: Usage,
611) -> ChatResponse {
612 let content: Vec<ContentBlock> = blocks.into_iter().filter_map(|b| b.finish()).collect();
613 ChatResponse {
614 content,
615 stop_reason,
616 usage,
617 }
618}
619
620fn finalize_incomplete_stream(
621 blocks: Vec<AssembledBlock>,
622 stop_reason: StopReason,
623 usage: Usage,
624) -> ChatResponse {
625 build_chat_response(blocks, stop_reason, usage)
626}
627
628#[derive(Default)]
629enum AssembledBlock {
630 #[default]
631 Empty,
632 Text(String),
633 Thinking {
634 text: String,
635 signature: Option<String>,
636 },
637 ToolUse {
638 id: String,
639 name: String,
640 input_json: String,
641 },
642 ServerToolUse {
643 id: String,
644 name: String,
645 input_json: String,
646 },
647 WebSearchResult {
648 tool_use_id: String,
649 content_json: String,
650 },
651}
652
653impl AssembledBlock {
654 fn finish(self) -> Option<ContentBlock> {
655 match self {
656 AssembledBlock::Empty => None,
657 AssembledBlock::Text(text) => Some(ContentBlock::Text { text }),
658 AssembledBlock::Thinking { text, signature } => Some(ContentBlock::Thinking {
659 thinking: text,
660 signature,
661 }),
662 AssembledBlock::ToolUse {
663 id,
664 name,
665 input_json,
666 } => {
667 let input: Value = if input_json.is_empty() {
668 json!({})
669 } else {
670 serde_json::from_str(&input_json).unwrap_or(json!({}))
671 };
672 Some(ContentBlock::ToolUse { id, name, input })
673 }
674 AssembledBlock::ServerToolUse {
675 id,
676 name,
677 input_json,
678 } => {
679 let input: Value = if input_json.is_empty() {
680 json!({})
681 } else {
682 serde_json::from_str(&input_json).unwrap_or(json!({}))
683 };
684 Some(ContentBlock::ServerToolUse { id, name, input })
685 }
686 AssembledBlock::WebSearchResult {
687 tool_use_id,
688 content_json,
689 } => {
690 let content: Value = if content_json.is_empty() {
691 json!({"results": []})
692 } else {
693 serde_json::from_str(&content_json).unwrap_or(json!({"results": []}))
694 };
695 Some(ContentBlock::WebSearchResult {
696 tool_use_id,
697 content: parse_web_search_content(&content),
698 })
699 }
700 }
701 }
702}
703
704fn parse_usage(u: &Value) -> Usage {
707 Usage {
708 input_tokens: u["input_tokens"].as_u64().unwrap_or(0) as u32,
709 output_tokens: u["output_tokens"].as_u64().unwrap_or(0) as u32,
710 cache_creation_input_tokens: u["cache_creation_input_tokens"].as_u64().unwrap_or(0) as u32,
711 cache_read_input_tokens: u["cache_read_input_tokens"].as_u64().unwrap_or(0) as u32,
712 }
713}
714
715fn merge_usage(mut acc: Usage, u: &Value) -> Usage {
719 let fresh = parse_usage(u);
720 if fresh.input_tokens > 0 {
721 acc.input_tokens = fresh.input_tokens;
722 }
723 if fresh.output_tokens > 0 {
724 acc.output_tokens = fresh.output_tokens;
725 }
726 if fresh.cache_creation_input_tokens > 0 {
727 acc.cache_creation_input_tokens = fresh.cache_creation_input_tokens;
728 }
729 if fresh.cache_read_input_tokens > 0 {
730 acc.cache_read_input_tokens = fresh.cache_read_input_tokens;
731 }
732 acc
733}
734
735fn parse_web_search_content(value: &serde_json::Value) -> crate::providers::WebSearchContent {
737 let results = value["results"]
738 .as_array()
739 .map(|arr| {
740 arr.iter()
741 .filter_map(|item| {
742 Some(crate::providers::WebSearchResultItem {
743 title: item["title"].as_str().map(String::from),
744 url: item["url"].as_str()?.to_string(),
745 encrypted_content: item["encrypted_content"].as_str().map(String::from),
746 snippet: item["snippet"].as_str().map(String::from),
747 })
748 })
749 .collect()
750 })
751 .unwrap_or_default();
752
753 crate::providers::WebSearchContent { results }
754}
755
756fn context_window_for(model: &str) -> Option<u32> {
762 if let Ok(raw) = std::env::var("CONTEXT_SIZE")
763 && let Ok(n) = raw.trim().parse::<u32>()
764 && n > 0 {
765 return Some(n);
766 }
767 let m = model.to_ascii_lowercase();
768
769 if m.contains("[1m]") || m.contains("opus-4-7") || m.contains("opus-4.7") {
771 return Some(1_000_000);
772 }
773 if m.contains("claude-3") || m.contains("claude-4") || m.contains("claude-opus") || m.contains("claude-sonnet") {
775 return Some(200_000);
776 }
777 if m.contains("claude-2") {
779 return Some(100_000);
780 }
781 if m.contains("claude-instant") {
783 return Some(100_000);
784 }
785 if m.contains("kimi") {
787 return Some(128_000);
788 }
789 if m.contains("deepseek") {
791 return Some(128_000);
792 }
793 if m.contains("glm") {
796 return Some(128_000);
797 }
798 if m.contains("qwen") {
800 if m.contains("qwen-max") || m.contains("qwen2.5") {
801 return Some(128_000);
802 }
803 return Some(32_000);
804 }
805 Some(128_000)
808}
809
810#[cfg(test)]
811mod tests {
812 use super::*;
813
814 #[test]
815 fn take_next_sse_frame_supports_crlf_delimiter() {
816 let mut buffer = concat!(
817 "event: message_start\r\n",
818 "data: {\"type\":\"message_start\"}\r\n\r\n",
819 "data: {\"type\":\"message_stop\"}\r\n\r\n"
820 )
821 .to_string();
822
823 let first = take_next_sse_frame(&mut buffer).expect("first frame");
824 assert!(first.contains("message_start"));
825
826 let second = take_next_sse_frame(&mut buffer).expect("second frame");
827 assert!(second.contains("message_stop"));
828 assert!(buffer.is_empty());
829 }
830
831 #[test]
832 fn take_trailing_sse_frame_returns_unterminated_event() {
833 let mut buffer = "data: {\"type\":\"message_stop\"}\r\n".to_string();
834 let frame = take_trailing_sse_frame(&mut buffer).expect("trailing frame");
835 assert_eq!(frame, "data: {\"type\":\"message_stop\"}");
836 assert!(buffer.is_empty());
837 }
838
839 #[test]
840 fn extract_sse_data_line_supports_optional_space() {
841 assert_eq!(
842 extract_sse_data_line("event: x\r\ndata: {\"k\":1}\r"),
843 Some("{\"k\":1}".to_string())
844 );
845 assert_eq!(
846 extract_sse_data_line("event: x\r\ndata:{\"k\":2}\r"),
847 Some("{\"k\":2}".to_string())
848 );
849 }
850
851 #[test]
852 fn finalize_incomplete_stream_keeps_accumulated_content() {
853 let response = finalize_incomplete_stream(
854 vec![AssembledBlock::Text("partial reply".to_string())],
855 StopReason::EndTurn,
856 Usage::default(),
857 );
858
859 assert_eq!(response.stop_reason, StopReason::EndTurn);
860 assert_eq!(response.content.len(), 1);
861 match &response.content[0] {
862 ContentBlock::Text { text } => assert_eq!(text, "partial reply"),
863 other => panic!("unexpected block: {other:?}"),
864 }
865 }
866}
867