1use async_stream::stream;
2use futures::StreamExt;
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{Level, enabled, info_span};
6use tracing_futures::Instrument;
7
8use super::completion::{
9 AnthropicCompatibleProvider, Content, GenericCompletionModel, Message, SystemContent,
10 ToolChoice, Usage, apply_prompt_cache_control, build_tool_definitions,
11 resolve_top_level_cache_control, split_system_messages_from_history,
12 supports_mid_conversation_system_messages,
13};
14use crate::completion::{CompletionError, CompletionRequest, GetTokenUsage};
15use crate::http_client::sse::{Event, GenericEventSource};
16use crate::http_client::{self, HttpClientExt};
17use crate::json_utils::merge_inplace;
18use crate::message::ReasoningContent;
19use crate::streaming::{
20 self, RawStreamingChoice, RawStreamingToolCall, StreamingResult, ToolCallDeltaContent,
21};
22use crate::telemetry::SpanCombinator;
23use crate::wasm_compat::{WasmCompatSend, WasmCompatSync};
24use std::collections::HashMap;
25
26#[derive(Debug, Deserialize)]
27#[serde(tag = "type", rename_all = "snake_case")]
28pub enum StreamingEvent {
29 MessageStart {
30 message: MessageStart,
31 },
32 ContentBlockStart {
33 index: usize,
34 content_block: Content,
35 },
36 ContentBlockDelta {
37 index: usize,
38 delta: ContentDelta,
39 },
40 ContentBlockStop {
41 index: usize,
42 },
43 MessageDelta {
44 delta: MessageDelta,
45 usage: PartialUsage,
46 },
47 MessageStop,
48 Ping,
49 #[serde(other)]
50 Unknown,
51}
52
53#[derive(Debug, Deserialize)]
54pub struct MessageStart {
55 pub id: String,
56 pub role: String,
57 pub content: Vec<Content>,
58 pub model: String,
59 pub stop_reason: Option<String>,
60 pub stop_sequence: Option<String>,
61 pub usage: Usage,
62}
63
64#[derive(Debug, Deserialize)]
65#[serde(tag = "type", rename_all = "snake_case")]
66pub enum ContentDelta {
67 TextDelta {
68 text: String,
69 },
70 InputJsonDelta {
71 partial_json: String,
72 },
73 ThinkingDelta {
74 thinking: String,
75 },
76 SignatureDelta {
77 signature: String,
78 },
79 CitationsDelta {
80 citation: super::completion::Citation,
81 },
82 #[serde(other)]
86 Unknown,
87}
88
89#[derive(Debug, Deserialize)]
90pub struct MessageDelta {
91 pub stop_reason: Option<String>,
92 pub stop_sequence: Option<String>,
93}
94
95#[derive(Debug, Deserialize, Clone, Serialize, Default)]
96pub struct PartialUsage {
97 pub output_tokens: usize,
98 #[serde(default)]
99 pub input_tokens: Option<usize>,
100 #[serde(default)]
101 pub cache_creation_input_tokens: Option<u64>,
102 #[serde(default)]
103 pub cache_read_input_tokens: Option<u64>,
104}
105
106impl GetTokenUsage for PartialUsage {
107 fn token_usage(&self) -> Option<crate::completion::Usage> {
108 let mut usage = crate::completion::Usage::new();
109
110 usage.input_tokens = self.input_tokens.unwrap_or_default() as u64;
111 usage.output_tokens = self.output_tokens as u64;
112 usage.cached_input_tokens = self.cache_read_input_tokens.unwrap_or(0);
113 usage.cache_creation_input_tokens = self.cache_creation_input_tokens.unwrap_or(0);
114 usage.total_tokens = usage.input_tokens
115 + usage.cached_input_tokens
116 + usage.cache_creation_input_tokens
117 + usage.output_tokens;
118 Some(usage)
119 }
120}
121
122#[derive(Default)]
123struct ToolCallState {
124 name: String,
125 id: String,
126 internal_call_id: String,
127 input_json: String,
128}
129
130struct ServerToolUseState {
131 name: String,
132 id: String,
133 initial_input: Value,
134 input_json: String,
135}
136
137#[derive(Default)]
138struct ThinkingState {
139 thinking: String,
140 signature: String,
141}
142
143#[derive(Clone, Debug, Deserialize, Serialize)]
144pub struct StreamingCompletionResponse {
145 pub usage: PartialUsage,
146}
147
148impl GetTokenUsage for StreamingCompletionResponse {
149 fn token_usage(&self) -> Option<crate::completion::Usage> {
150 let mut usage = crate::completion::Usage::new();
151 usage.input_tokens = self.usage.input_tokens.unwrap_or(0) as u64;
152 usage.output_tokens = self.usage.output_tokens as u64;
153 usage.cached_input_tokens = self.usage.cache_read_input_tokens.unwrap_or(0);
154 usage.cache_creation_input_tokens = self.usage.cache_creation_input_tokens.unwrap_or(0);
155 usage.total_tokens = usage.input_tokens
156 + usage.cached_input_tokens
157 + usage.cache_creation_input_tokens
158 + usage.output_tokens;
159
160 Some(usage)
161 }
162}
163
164impl<Ext, T> GenericCompletionModel<Ext, T>
165where
166 T: HttpClientExt + Clone + Default + 'static,
167 Ext: AnthropicCompatibleProvider + Clone + WasmCompatSend + WasmCompatSync + 'static,
168{
169 pub(crate) async fn stream(
170 &self,
171 mut completion_request: CompletionRequest,
172 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
173 {
174 let request_model = completion_request
175 .model
176 .clone()
177 .unwrap_or_else(|| self.model.clone());
178 let span = if tracing::Span::current().is_disabled() {
179 info_span!(
180 target: "rig::completions",
181 "chat_streaming",
182 gen_ai.operation.name = "chat_streaming",
183 gen_ai.provider.name = Ext::PROVIDER_NAME,
184 gen_ai.request.model = &request_model,
185 gen_ai.system_instructions = &completion_request.preamble,
186 gen_ai.response.id = tracing::field::Empty,
187 gen_ai.response.model = &request_model,
188 gen_ai.usage.output_tokens = tracing::field::Empty,
189 gen_ai.usage.input_tokens = tracing::field::Empty,
190 gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
191 gen_ai.usage.cache_creation.input_tokens = tracing::field::Empty,
192 gen_ai.input.messages = tracing::field::Empty,
193 gen_ai.output.messages = tracing::field::Empty,
194 )
195 } else {
196 tracing::Span::current()
197 };
198 let max_tokens = if let Some(tokens) = completion_request.max_tokens {
199 tokens
200 } else if let Some(tokens) = self.default_max_tokens {
201 tokens
202 } else {
203 return Err(CompletionError::RequestError(
204 "`max_tokens` must be set for Anthropic".into(),
205 ));
206 };
207
208 let docs = completion_request.normalized_documents();
209 let (history_system, chat_history) = split_system_messages_from_history(
210 completion_request.chat_history.into_iter().collect(),
211 supports_mid_conversation_system_messages(&request_model),
212 );
213 let mut full_history = vec![];
214 if let Some(docs) = docs {
215 full_history.push(docs);
216 }
217 full_history.extend(chat_history);
218
219 let mut messages = full_history
220 .into_iter()
221 .map(Message::try_from)
222 .collect::<Result<Vec<Message>, _>>()?;
223
224 let mut system: Vec<SystemContent> =
226 if let Some(preamble) = completion_request.preamble.as_ref() {
227 if preamble.is_empty() {
228 vec![]
229 } else {
230 vec![SystemContent::Text {
231 text: preamble.clone(),
232 cache_control: None,
233 }]
234 }
235 } else {
236 vec![]
237 };
238 system.extend(history_system);
239
240 let mut additional_params_payload = completion_request
241 .additional_params
242 .take()
243 .unwrap_or(Value::Null);
244 let top_level_cache_control = resolve_top_level_cache_control(
245 self.automatic_caching,
246 self.automatic_caching_ttl.clone(),
247 &mut additional_params_payload,
248 )?;
249 let mut tools =
250 build_tool_definitions(completion_request.tools, &mut additional_params_payload)?;
251
252 apply_prompt_cache_control(
253 &mut system,
254 &mut messages,
255 &mut tools,
256 self.prompt_caching,
257 top_level_cache_control.as_ref(),
258 )?;
259
260 let mut body = json!({
261 "model": request_model,
262 "messages": messages,
263 "max_tokens": max_tokens,
264 "stream": true,
265 });
266
267 if let Some(cache_control) = top_level_cache_control {
270 merge_inplace(
271 &mut body,
272 json!({ "cache_control": serde_json::to_value(&cache_control)? }),
273 );
274 }
275
276 if !system.is_empty() {
278 merge_inplace(&mut body, json!({ "system": system }));
279 }
280
281 if let Some(temperature) = completion_request.temperature {
282 merge_inplace(&mut body, json!({ "temperature": temperature }));
283 }
284
285 if !tools.is_empty() {
286 merge_inplace(
287 &mut body,
288 json!({
289 "tools": tools,
290 "tool_choice": ToolChoice::Auto,
291 }),
292 );
293 }
294
295 if !additional_params_payload.is_null() {
296 merge_inplace(&mut body, additional_params_payload)
297 }
298
299 if enabled!(Level::TRACE) {
300 tracing::trace!(
301 target: "rig::completions",
302 "Anthropic completion request: {}",
303 serde_json::to_string_pretty(&body)?
304 );
305 }
306
307 let body: Vec<u8> = serde_json::to_vec(&body)?;
308
309 let req = self
310 .client
311 .post("/v1/messages")?
312 .body(body)
313 .map_err(http_client::Error::Protocol)?;
314
315 let stream = GenericEventSource::new(self.client.clone(), req);
316
317 let stream: StreamingResult<StreamingCompletionResponse> = Box::pin(stream! {
319 let mut current_tool_call: Option<ToolCallState> = None;
320 let mut server_tool_uses: HashMap<usize, ServerToolUseState> = HashMap::new();
321 let mut current_thinking: Option<ThinkingState> = None;
322 let mut sse_stream = Box::pin(stream);
323 let mut input_tokens = 0;
324 let mut final_usage = None;
325
326 let mut text_content = String::new();
327
328 while let Some(sse_result) = sse_stream.next().await {
329 match sse_result {
330 Ok(Event::Open) => {}
331 Ok(Event::Message(sse)) => {
332 match serde_json::from_str::<StreamingEvent>(&sse.data) {
334 Ok(event) => {
335 match &event {
336 StreamingEvent::MessageStart { message } => {
337 input_tokens = message.usage.input_tokens;
338
339 let span = tracing::Span::current();
340 span.record("gen_ai.response.id", &message.id);
341 span.record("gen_ai.response.model", &message.model);
342 },
343 StreamingEvent::MessageDelta { delta, usage } => {
344 if delta.stop_reason.is_some() {
345 let usage = PartialUsage {
349 output_tokens: usage.output_tokens,
350 input_tokens: usize::try_from(input_tokens).ok(),
351 cache_creation_input_tokens: usage.cache_creation_input_tokens,
352 cache_read_input_tokens: usage.cache_read_input_tokens
353 };
354
355 let span = tracing::Span::current();
356 span.record_token_usage(&usage);
357 final_usage = Some(usage);
358 break;
359 }
360 }
361 _ => {}
362 }
363
364 if let Some(result) = handle_event(
365 &event,
366 &mut current_tool_call,
367 &mut server_tool_uses,
368 &mut current_thinking,
369 ) {
370 if let Ok(RawStreamingChoice::Message(ref text)) = result {
371 text_content += text;
372 }
373 yield result;
374 }
375 },
376 Err(e) => {
377 if !sse.data.trim().is_empty() {
378 yield Err(CompletionError::ResponseError(
379 format!("Failed to parse JSON: {} (Data: {})", e, sse.data)
380 ));
381 }
382 }
383 }
384 },
385 Err(e) => {
386 yield Err(CompletionError::ProviderError(format!("SSE Error: {e}")));
387 break;
388 }
389 }
390 }
391
392 sse_stream.close();
394
395 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
396 usage: final_usage.unwrap_or_default()
397 }))
398 }.instrument(span));
399
400 Ok(streaming::StreamingCompletionResponse::stream(stream))
401 }
402}
403
404fn handle_event(
405 event: &StreamingEvent,
406 current_tool_call: &mut Option<ToolCallState>,
407 server_tool_uses: &mut HashMap<usize, ServerToolUseState>,
408 current_thinking: &mut Option<ThinkingState>,
409) -> Option<Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>> {
410 match event {
411 StreamingEvent::ContentBlockDelta { index, delta } => match delta {
412 ContentDelta::TextDelta { text } => {
413 if current_tool_call.is_none() {
414 return Some(Ok(RawStreamingChoice::Message(text.clone())));
415 }
416 None
417 }
418 ContentDelta::InputJsonDelta { partial_json } => {
419 if let Some(server_tool_use) = server_tool_uses.get_mut(index) {
420 server_tool_use.input_json.push_str(partial_json);
421 return None;
422 }
423
424 if let Some(tool_call) = current_tool_call {
425 tool_call.input_json.push_str(partial_json);
426 return Some(Ok(RawStreamingChoice::ToolCallDelta {
428 id: tool_call.id.clone(),
429 internal_call_id: tool_call.internal_call_id.clone(),
430 content: ToolCallDeltaContent::Delta(partial_json.clone()),
431 }));
432 }
433 None
434 }
435 ContentDelta::ThinkingDelta { thinking } => {
436 current_thinking
437 .get_or_insert_with(ThinkingState::default)
438 .thinking
439 .push_str(thinking);
440
441 Some(Ok(RawStreamingChoice::ReasoningDelta {
442 id: None,
443 reasoning: thinking.clone(),
444 }))
445 }
446 ContentDelta::SignatureDelta { signature } => {
447 current_thinking
448 .get_or_insert_with(ThinkingState::default)
449 .signature
450 .push_str(signature);
451
452 None
454 }
455 ContentDelta::CitationsDelta { citation } => {
456 Some(Ok(RawStreamingChoice::TextAdditionalParams(json!({
457 "citations": [citation]
458 }))))
459 }
460 ContentDelta::Unknown => None,
461 },
462 StreamingEvent::ContentBlockStart {
463 index,
464 content_block,
465 } => match content_block {
466 Content::Text { citations, .. } => {
467 let additional_params = (!citations.is_empty()).then(|| {
468 json!({
469 "citations": citations
470 })
471 });
472 Some(Ok(RawStreamingChoice::TextStart { additional_params }))
473 }
474 Content::ServerToolUse { id, name, input } => {
475 server_tool_uses.insert(
476 *index,
477 ServerToolUseState {
478 name: name.clone(),
479 id: id.clone(),
480 initial_input: input.clone(),
481 input_json: String::new(),
482 },
483 );
484 None
485 }
486 raw @ Content::WebSearchToolResult { .. } => Some(Ok(RawStreamingChoice::TextStart {
487 additional_params: Some(json!({
488 super::completion::ANTHROPIC_RAW_CONTENT_KEY: raw
489 })),
490 })),
491 Content::ToolUse { id, name, .. } => {
492 let internal_call_id = nanoid::nanoid!();
493 *current_tool_call = Some(ToolCallState {
494 name: name.clone(),
495 id: id.clone(),
496 internal_call_id: internal_call_id.clone(),
497 input_json: String::new(),
498 });
499 Some(Ok(RawStreamingChoice::ToolCallDelta {
500 id: id.clone(),
501 internal_call_id,
502 content: ToolCallDeltaContent::Name(name.clone()),
503 }))
504 }
505 Content::Thinking { .. } => {
506 *current_thinking = Some(ThinkingState::default());
507 None
508 }
509 Content::RedactedThinking { data } => Some(Ok(RawStreamingChoice::Reasoning {
510 id: None,
511 content: ReasoningContent::Redacted { data: data.clone() },
512 })),
513 _ => None,
515 },
516 StreamingEvent::ContentBlockStop { index } => {
517 if let Some(thinking_state) = Option::take(current_thinking)
518 && !thinking_state.thinking.is_empty()
519 {
520 let signature = if thinking_state.signature.is_empty() {
521 None
522 } else {
523 Some(thinking_state.signature)
524 };
525
526 return Some(Ok(RawStreamingChoice::Reasoning {
527 id: None,
528 content: ReasoningContent::Text {
529 text: thinking_state.thinking,
530 signature,
531 },
532 }));
533 }
534
535 if let Some(server_tool_use) = server_tool_uses.remove(index) {
536 let input = if server_tool_use.input_json.is_empty() {
537 if server_tool_use.initial_input.is_null() {
538 json!({})
539 } else {
540 server_tool_use.initial_input
541 }
542 } else {
543 match serde_json::from_str(&server_tool_use.input_json) {
544 Ok(json_value) => json_value,
545 Err(e) => return Some(Err(CompletionError::from(e))),
546 }
547 };
548
549 return Some(Ok(RawStreamingChoice::TextStart {
550 additional_params: Some(json!({
551 super::completion::ANTHROPIC_RAW_CONTENT_KEY: Content::ServerToolUse {
552 id: server_tool_use.id,
553 name: server_tool_use.name,
554 input,
555 }
556 })),
557 }));
558 }
559
560 if let Some(tool_call) = Option::take(current_tool_call) {
561 let json_str = if tool_call.input_json.is_empty() {
562 "{}"
563 } else {
564 &tool_call.input_json
565 };
566 match serde_json::from_str(json_str) {
567 Ok(json_value) => {
568 let raw_tool_call =
569 RawStreamingToolCall::new(tool_call.id, tool_call.name, json_value)
570 .with_internal_call_id(tool_call.internal_call_id);
571 Some(Ok(RawStreamingChoice::ToolCall(raw_tool_call)))
572 }
573 Err(e) => Some(Err(CompletionError::from(e))),
574 }
575 } else {
576 None
577 }
578 }
579 StreamingEvent::MessageStart { .. }
581 | StreamingEvent::MessageDelta { .. }
582 | StreamingEvent::MessageStop
583 | StreamingEvent::Ping
584 | StreamingEvent::Unknown => None,
585 }
586}
587
588#[cfg(test)]
589mod tests {
590 use super::super::completion::{CacheControl, CacheTtl};
591 use super::*;
592 use async_stream::stream;
593 use futures::StreamExt;
594
595 #[cfg(not(all(feature = "wasm", target_arch = "wasm32")))]
596 fn to_stream_result(
597 stream: impl futures::Stream<
598 Item = Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>,
599 > + Send
600 + 'static,
601 ) -> crate::streaming::StreamingResult<StreamingCompletionResponse> {
602 Box::pin(stream)
603 }
604
605 #[cfg(all(feature = "wasm", target_arch = "wasm32"))]
606 fn to_stream_result(
607 stream: impl futures::Stream<
608 Item = Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>,
609 > + 'static,
610 ) -> crate::streaming::StreamingResult<StreamingCompletionResponse> {
611 Box::pin(stream)
612 }
613
614 #[test]
615 fn test_streaming_tool_build_marks_final_combined_tool() {
616 let mut additional_params = json!({
617 "tools": [{
618 "name": "provider_tool",
619 "description": "Provider tool",
620 "input_schema": {"type": "object"}
621 }]
622 });
623
624 let mut tools = build_tool_definitions(
625 vec![crate::completion::ToolDefinition {
626 name: "rig_tool".to_string(),
627 description: "Rig tool".to_string(),
628 parameters: json!({"type": "object", "properties": {}}),
629 }],
630 &mut additional_params,
631 )
632 .unwrap();
633 let mut system: Vec<SystemContent> = Vec::new();
634 let mut messages: Vec<Message> = Vec::new();
635 apply_prompt_cache_control(&mut system, &mut messages, &mut tools, true, None).unwrap();
636
637 assert_eq!(tools.len(), 2);
638 assert!(tools[0].get("cache_control").is_none());
639 assert_eq!(tools[1]["name"], "provider_tool");
640 assert_eq!(tools[1]["cache_control"]["type"], "ephemeral");
641 }
642
643 #[test]
644 fn test_streaming_prompt_cache_control_uses_raw_top_level_ttl() {
645 let mut additional_params = json!({
646 "cache_control": {"type": "ephemeral", "ttl": "1h"}
647 });
648 let top_level_cache_control =
649 resolve_top_level_cache_control(false, None, &mut additional_params).unwrap();
650 let mut tools = build_tool_definitions(
651 vec![crate::completion::ToolDefinition {
652 name: "rig_tool".to_string(),
653 description: "Rig tool".to_string(),
654 parameters: json!({"type": "object", "properties": {}}),
655 }],
656 &mut additional_params,
657 )
658 .unwrap();
659 let mut system = vec![SystemContent::Text {
660 text: "System prompt".to_string(),
661 cache_control: None,
662 }];
663 let mut messages: Vec<Message> = Vec::new();
664
665 apply_prompt_cache_control(
666 &mut system,
667 &mut messages,
668 &mut tools,
669 true,
670 top_level_cache_control.as_ref(),
671 )
672 .unwrap();
673
674 assert_eq!(tools[0]["cache_control"]["type"], "ephemeral");
675 assert_eq!(tools[0]["cache_control"]["ttl"], "1h");
676 match &system[0] {
677 SystemContent::Text {
678 cache_control: Some(CacheControl::Ephemeral { ttl }),
679 ..
680 } => assert_eq!(ttl.as_ref(), Some(&CacheTtl::OneHour)),
681 other => panic!("expected system cache_control, got {other:?}"),
682 }
683 assert!(additional_params.get("cache_control").is_none());
684 }
685
686 fn handle_event(
687 event: &StreamingEvent,
688 current_tool_call: &mut Option<ToolCallState>,
689 current_thinking: &mut Option<ThinkingState>,
690 ) -> Option<Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>> {
691 let mut server_tool_uses = HashMap::new();
692 super::handle_event(
693 event,
694 current_tool_call,
695 &mut server_tool_uses,
696 current_thinking,
697 )
698 }
699
700 #[test]
701 fn test_thinking_delta_deserialization() {
702 let json = r#"{"type": "thinking_delta", "thinking": "Let me think about this..."}"#;
703 let delta: ContentDelta = serde_json::from_str(json).unwrap();
704
705 match delta {
706 ContentDelta::ThinkingDelta { thinking } => {
707 assert_eq!(thinking, "Let me think about this...");
708 }
709 _ => panic!("Expected ThinkingDelta variant"),
710 }
711 }
712
713 #[test]
714 fn test_signature_delta_deserialization() {
715 let json = r#"{"type": "signature_delta", "signature": "abc123def456"}"#;
716 let delta: ContentDelta = serde_json::from_str(json).unwrap();
717
718 match delta {
719 ContentDelta::SignatureDelta { signature } => {
720 assert_eq!(signature, "abc123def456");
721 }
722 _ => panic!("Expected SignatureDelta variant"),
723 }
724 }
725
726 #[test]
727 fn test_thinking_delta_streaming_event_deserialization() {
728 let json = r#"{
729 "type": "content_block_delta",
730 "index": 0,
731 "delta": {
732 "type": "thinking_delta",
733 "thinking": "First, I need to understand the problem."
734 }
735 }"#;
736
737 let event: StreamingEvent = serde_json::from_str(json).unwrap();
738
739 match event {
740 StreamingEvent::ContentBlockDelta { index, delta } => {
741 assert_eq!(index, 0);
742 match delta {
743 ContentDelta::ThinkingDelta { thinking } => {
744 assert_eq!(thinking, "First, I need to understand the problem.");
745 }
746 _ => panic!("Expected ThinkingDelta"),
747 }
748 }
749 _ => panic!("Expected ContentBlockDelta event"),
750 }
751 }
752
753 #[test]
754 fn test_signature_delta_streaming_event_deserialization() {
755 let json = r#"{
756 "type": "content_block_delta",
757 "index": 0,
758 "delta": {
759 "type": "signature_delta",
760 "signature": "ErUBCkYICBgCIkCaGbqC85F4"
761 }
762 }"#;
763
764 let event: StreamingEvent = serde_json::from_str(json).unwrap();
765
766 match event {
767 StreamingEvent::ContentBlockDelta { index, delta } => {
768 assert_eq!(index, 0);
769 match delta {
770 ContentDelta::SignatureDelta { signature } => {
771 assert_eq!(signature, "ErUBCkYICBgCIkCaGbqC85F4");
772 }
773 _ => panic!("Expected SignatureDelta"),
774 }
775 }
776 _ => panic!("Expected ContentBlockDelta event"),
777 }
778 }
779
780 #[test]
781 fn test_handle_thinking_delta_event() {
782 let event = StreamingEvent::ContentBlockDelta {
783 index: 0,
784 delta: ContentDelta::ThinkingDelta {
785 thinking: "Analyzing the request...".to_string(),
786 },
787 };
788
789 let mut tool_call_state = None;
790 let mut thinking_state = None;
791 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
792
793 assert!(result.is_some());
794 let choice = result.unwrap().unwrap();
795
796 match choice {
797 RawStreamingChoice::ReasoningDelta { id, reasoning, .. } => {
798 assert_eq!(id, None);
799 assert_eq!(reasoning, "Analyzing the request...");
800 }
801 _ => panic!("Expected ReasoningDelta choice"),
802 }
803
804 assert!(thinking_state.is_some());
806 assert_eq!(thinking_state.unwrap().thinking, "Analyzing the request...");
807 }
808
809 #[test]
810 fn test_handle_signature_delta_event() {
811 let event = StreamingEvent::ContentBlockDelta {
812 index: 0,
813 delta: ContentDelta::SignatureDelta {
814 signature: "test_signature".to_string(),
815 },
816 };
817
818 let mut tool_call_state = None;
819 let mut thinking_state = None;
820 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
821
822 assert!(result.is_none());
824
825 assert!(thinking_state.is_some());
827 assert_eq!(thinking_state.unwrap().signature, "test_signature");
828 }
829
830 #[test]
831 fn test_handle_redacted_thinking_content_block_start_event() {
832 let event = StreamingEvent::ContentBlockStart {
833 index: 0,
834 content_block: Content::RedactedThinking {
835 data: "redacted_blob".to_string(),
836 },
837 };
838 let mut tool_call_state = None;
839 let mut thinking_state = None;
840 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
841
842 assert!(result.is_some());
843 match result.unwrap().unwrap() {
844 RawStreamingChoice::Reasoning {
845 content: ReasoningContent::Redacted { data },
846 ..
847 } => {
848 assert_eq!(data, "redacted_blob");
849 }
850 _ => panic!("Expected Redacted reasoning chunk"),
851 }
852 }
853
854 #[test]
855 fn test_handle_text_delta_event() {
856 let event = StreamingEvent::ContentBlockDelta {
857 index: 0,
858 delta: ContentDelta::TextDelta {
859 text: "Hello, world!".to_string(),
860 },
861 };
862
863 let mut tool_call_state = None;
864 let mut thinking_state = None;
865 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
866
867 assert!(result.is_some());
868 let choice = result.unwrap().unwrap();
869
870 match choice {
871 RawStreamingChoice::Message(text) => {
872 assert_eq!(text, "Hello, world!");
873 }
874 _ => panic!("Expected Message choice"),
875 }
876 }
877
878 #[test]
879 fn test_handle_text_block_start_event() {
880 let event = StreamingEvent::ContentBlockStart {
881 index: 0,
882 content_block: Content::Text {
883 text: String::new(),
884 citations: Vec::new(),
885 cache_control: None,
886 },
887 };
888
889 let mut tool_call_state = None;
890 let mut thinking_state = None;
891 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
892
893 assert!(result.is_some());
894 let choice = result.unwrap().unwrap();
895 assert!(matches!(
896 choice,
897 RawStreamingChoice::TextStart {
898 additional_params: None
899 }
900 ));
901 }
902
903 #[test]
904 fn test_thinking_delta_does_not_interfere_with_tool_calls() {
905 let event = StreamingEvent::ContentBlockDelta {
907 index: 0,
908 delta: ContentDelta::ThinkingDelta {
909 thinking: "Thinking while tool is active...".to_string(),
910 },
911 };
912
913 let mut tool_call_state = Some(ToolCallState {
914 name: "test_tool".to_string(),
915 id: "tool_123".to_string(),
916 internal_call_id: nanoid::nanoid!(),
917 input_json: String::new(),
918 });
919 let mut thinking_state = None;
920
921 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
922
923 assert!(result.is_some());
924 let choice = result.unwrap().unwrap();
925
926 match choice {
927 RawStreamingChoice::ReasoningDelta { reasoning, .. } => {
928 assert_eq!(reasoning, "Thinking while tool is active...");
929 }
930 _ => panic!("Expected ReasoningDelta choice"),
931 }
932
933 assert!(tool_call_state.is_some());
935 }
936
937 #[test]
938 fn test_handle_input_json_delta_event() {
939 let event = StreamingEvent::ContentBlockDelta {
940 index: 0,
941 delta: ContentDelta::InputJsonDelta {
942 partial_json: "{\"arg\":\"value".to_string(),
943 },
944 };
945
946 let mut tool_call_state = Some(ToolCallState {
947 name: "test_tool".to_string(),
948 id: "tool_123".to_string(),
949 internal_call_id: nanoid::nanoid!(),
950 input_json: String::new(),
951 });
952 let mut thinking_state = None;
953
954 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
955
956 assert!(result.is_some());
958 let choice = result.unwrap().unwrap();
959
960 match choice {
961 RawStreamingChoice::ToolCallDelta {
962 id,
963 internal_call_id: _,
964 content,
965 } => {
966 assert_eq!(id, "tool_123");
967 match content {
968 ToolCallDeltaContent::Delta(delta) => assert_eq!(delta, "{\"arg\":\"value"),
969 _ => panic!("Expected Delta content"),
970 }
971 }
972 _ => panic!("Expected ToolCallDelta choice, got {:?}", choice),
973 }
974
975 assert!(tool_call_state.is_some());
977 let state = tool_call_state.unwrap();
978 assert_eq!(state.input_json, "{\"arg\":\"value");
979 }
980
981 #[test]
982 fn test_tool_call_accumulation_with_multiple_deltas() {
983 let mut tool_call_state = Some(ToolCallState {
984 name: "test_tool".to_string(),
985 id: "tool_123".to_string(),
986 internal_call_id: nanoid::nanoid!(),
987 input_json: String::new(),
988 });
989 let mut thinking_state = None;
990
991 let event1 = StreamingEvent::ContentBlockDelta {
993 index: 0,
994 delta: ContentDelta::InputJsonDelta {
995 partial_json: "{\"location\":".to_string(),
996 },
997 };
998 let result1 = handle_event(&event1, &mut tool_call_state, &mut thinking_state);
999 assert!(result1.is_some());
1000
1001 let event2 = StreamingEvent::ContentBlockDelta {
1003 index: 0,
1004 delta: ContentDelta::InputJsonDelta {
1005 partial_json: "\"Paris\",".to_string(),
1006 },
1007 };
1008 let result2 = handle_event(&event2, &mut tool_call_state, &mut thinking_state);
1009 assert!(result2.is_some());
1010
1011 let event3 = StreamingEvent::ContentBlockDelta {
1013 index: 0,
1014 delta: ContentDelta::InputJsonDelta {
1015 partial_json: "\"temp\":\"20C\"}".to_string(),
1016 },
1017 };
1018 let result3 = handle_event(&event3, &mut tool_call_state, &mut thinking_state);
1019 assert!(result3.is_some());
1020
1021 assert!(tool_call_state.is_some());
1023 let state = tool_call_state.as_ref().unwrap();
1024 assert_eq!(
1025 state.input_json,
1026 "{\"location\":\"Paris\",\"temp\":\"20C\"}"
1027 );
1028
1029 let stop_event = StreamingEvent::ContentBlockStop { index: 0 };
1031 let final_result = handle_event(&stop_event, &mut tool_call_state, &mut thinking_state);
1032 assert!(final_result.is_some());
1033
1034 match final_result.unwrap().unwrap() {
1035 RawStreamingChoice::ToolCall(RawStreamingToolCall {
1036 id,
1037 name,
1038 arguments,
1039 ..
1040 }) => {
1041 assert_eq!(id, "tool_123");
1042 assert_eq!(name, "test_tool");
1043 assert_eq!(
1044 arguments.get("location").unwrap().as_str().unwrap(),
1045 "Paris"
1046 );
1047 assert_eq!(arguments.get("temp").unwrap().as_str().unwrap(), "20C");
1048 }
1049 other => panic!("Expected ToolCall, got {:?}", other),
1050 }
1051
1052 assert!(tool_call_state.is_none());
1054 }
1055
1056 #[test]
1057 fn test_citations_delta_streaming_event_deserialization() {
1058 let json = r#"{
1059 "type": "content_block_delta",
1060 "index": 0,
1061 "delta": {
1062 "type": "citations_delta",
1063 "citation": {
1064 "type": "char_location",
1065 "cited_text": "The grass is green.",
1066 "document_index": 0,
1067 "document_title": "Example",
1068 "start_char_index": 0,
1069 "end_char_index": 20
1070 }
1071 }
1072 }"#;
1073
1074 let event: StreamingEvent = serde_json::from_str(json).unwrap();
1075 let StreamingEvent::ContentBlockDelta { index, delta } = event else {
1076 panic!("expected ContentBlockDelta");
1077 };
1078 assert_eq!(index, 0);
1079 let ContentDelta::CitationsDelta { citation } = delta else {
1080 panic!("expected CitationsDelta");
1081 };
1082 let crate::providers::anthropic::completion::Citation::CharLocation {
1083 start_char_index,
1084 end_char_index,
1085 ..
1086 } = citation
1087 else {
1088 panic!("expected CharLocation");
1089 };
1090 assert_eq!(start_char_index, 0);
1091 assert_eq!(end_char_index, 20);
1092 }
1093
1094 #[test]
1095 fn test_search_result_citations_delta_streaming_event_deserialization() {
1096 let json = r#"{
1097 "type": "content_block_delta",
1098 "index": 0,
1099 "delta": {
1100 "type": "citations_delta",
1101 "citation": {
1102 "type": "search_result_location",
1103 "cited_text": "API requests require a key.",
1104 "source": "https://docs.example.com/api-reference",
1105 "title": "API Reference",
1106 "search_result_index": 0,
1107 "start_block_index": 0,
1108 "end_block_index": 1
1109 }
1110 }
1111 }"#;
1112
1113 let event: StreamingEvent = serde_json::from_str(json).unwrap();
1114 let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1115 panic!("expected ContentBlockDelta");
1116 };
1117 let ContentDelta::CitationsDelta { citation } = delta else {
1118 panic!("expected CitationsDelta");
1119 };
1120 assert!(matches!(
1121 citation,
1122 crate::providers::anthropic::completion::Citation::SearchResultLocation {
1123 search_result_index: 0,
1124 start_block_index: 0,
1125 end_block_index: 1,
1126 ..
1127 }
1128 ));
1129 }
1130
1131 #[test]
1132 fn test_web_search_result_citations_delta_streaming_event_deserialization() {
1133 let json = r#"{
1134 "type": "content_block_delta",
1135 "index": 0,
1136 "delta": {
1137 "type": "citations_delta",
1138 "citation": {
1139 "type": "web_search_result_location",
1140 "cited_text": "Claude Shannon was a mathematician.",
1141 "url": "https://example.com/shannon",
1142 "title": "Claude Shannon",
1143 "encrypted_index": "encrypted-reference"
1144 }
1145 }
1146 }"#;
1147
1148 let event: StreamingEvent = serde_json::from_str(json).unwrap();
1149 let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1150 panic!("expected ContentBlockDelta");
1151 };
1152 let ContentDelta::CitationsDelta { citation } = delta else {
1153 panic!("expected CitationsDelta");
1154 };
1155 assert!(matches!(
1156 citation,
1157 crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1158 ref url,
1159 ref encrypted_index,
1160 ..
1161 } if url == "https://example.com/shannon"
1162 && encrypted_index == "encrypted-reference"
1163 ));
1164 }
1165
1166 #[test]
1167 fn test_web_search_result_citations_delta_allows_null_title() {
1168 let json = r#"{
1169 "type": "content_block_delta",
1170 "index": 0,
1171 "delta": {
1172 "type": "citations_delta",
1173 "citation": {
1174 "type": "web_search_result_location",
1175 "cited_text": "Claude Shannon was a mathematician.",
1176 "url": "https://example.com/shannon",
1177 "title": null,
1178 "encrypted_index": "encrypted-reference"
1179 }
1180 }
1181 }"#;
1182
1183 let event: StreamingEvent = serde_json::from_str(json).unwrap();
1184 let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1185 panic!("expected ContentBlockDelta");
1186 };
1187 let ContentDelta::CitationsDelta { citation } = delta else {
1188 panic!("expected CitationsDelta");
1189 };
1190 assert!(matches!(
1191 citation,
1192 crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1193 title: None,
1194 ..
1195 }
1196 ));
1197 }
1198
1199 #[test]
1200 fn test_web_search_content_block_start_events_deserialize() {
1201 let server_tool_use = r#"{
1202 "type": "content_block_start",
1203 "index": 1,
1204 "content_block": {
1205 "type": "server_tool_use",
1206 "id": "srvtoolu_01",
1207 "name": "web_search",
1208 "input": {
1209 "query": "claude shannon birth date"
1210 }
1211 }
1212 }"#;
1213 let event: StreamingEvent = serde_json::from_str(server_tool_use).unwrap();
1214 assert!(matches!(
1215 event,
1216 StreamingEvent::ContentBlockStart {
1217 content_block: Content::ServerToolUse {
1218 ref id,
1219 ref name,
1220 ref input
1221 },
1222 ..
1223 } if id == "srvtoolu_01"
1224 && name == "web_search"
1225 && input["query"] == "claude shannon birth date"
1226 ));
1227
1228 let web_search_tool_result = r#"{
1229 "type": "content_block_start",
1230 "index": 2,
1231 "content_block": {
1232 "type": "web_search_tool_result",
1233 "tool_use_id": "srvtoolu_01",
1234 "content": [{
1235 "type": "web_search_result",
1236 "url": "https://example.com/shannon",
1237 "title": "Claude Shannon",
1238 "encrypted_content": "encrypted-content"
1239 }]
1240 }
1241 }"#;
1242 let event: StreamingEvent = serde_json::from_str(web_search_tool_result).unwrap();
1243 assert!(matches!(
1244 event,
1245 StreamingEvent::ContentBlockStart {
1246 content_block: Content::WebSearchToolResult {
1247 ref tool_use_id,
1248 ref content
1249 },
1250 ..
1251 } if tool_use_id == "srvtoolu_01"
1252 && content[0]["encrypted_content"] == "encrypted-content"
1253 ));
1254 }
1255
1256 #[tokio::test]
1257 async fn test_streaming_web_search_blocks_are_preserved_on_final_choice() {
1258 let raw_stream = stream! {
1259 let mut tool_call_state = None;
1260 let mut server_tool_uses = HashMap::new();
1261 let mut thinking_state = None;
1262
1263 let server_tool_use_start = super::handle_event(
1264 &StreamingEvent::ContentBlockStart {
1265 index: 0,
1266 content_block: Content::ServerToolUse {
1267 id: "srvtoolu_01".to_string(),
1268 name: "web_search".to_string(),
1269 input: serde_json::Value::Null,
1270 },
1271 },
1272 &mut tool_call_state,
1273 &mut server_tool_uses,
1274 &mut thinking_state,
1275 );
1276 assert!(
1277 server_tool_use_start.is_none(),
1278 "server_tool_use start should be accumulated until its input JSON is complete"
1279 );
1280
1281 let server_tool_use_delta = super::handle_event(
1282 &StreamingEvent::ContentBlockDelta {
1283 index: 0,
1284 delta: ContentDelta::InputJsonDelta {
1285 partial_json: r#"{"query":"claude shannon birth date"}"#.to_string(),
1286 },
1287 },
1288 &mut tool_call_state,
1289 &mut server_tool_uses,
1290 &mut thinking_state,
1291 );
1292 assert!(
1293 server_tool_use_delta.is_none(),
1294 "server_tool_use input JSON should not be emitted as a Rig tool-call delta"
1295 );
1296
1297 yield super::handle_event(
1298 &StreamingEvent::ContentBlockStop { index: 0 },
1299 &mut tool_call_state,
1300 &mut server_tool_uses,
1301 &mut thinking_state,
1302 )
1303 .expect("server_tool_use stop should produce completed raw metadata");
1304
1305 yield super::handle_event(
1306 &StreamingEvent::ContentBlockStart {
1307 index: 1,
1308 content_block: Content::WebSearchToolResult {
1309 tool_use_id: "srvtoolu_01".to_string(),
1310 content: serde_json::json!([{
1311 "type": "web_search_result",
1312 "url": "https://example.com/shannon",
1313 "title": "Claude Shannon",
1314 "encrypted_content": "encrypted-content"
1315 }]),
1316 },
1317 },
1318 &mut tool_call_state,
1319 &mut server_tool_uses,
1320 &mut thinking_state,
1321 )
1322 .expect("web_search_tool_result block should produce raw metadata");
1323
1324 yield super::handle_event(
1325 &StreamingEvent::ContentBlockStart {
1326 index: 2,
1327 content_block: Content::Text {
1328 text: String::new(),
1329 citations: Vec::new(),
1330 cache_control: None,
1331 },
1332 },
1333 &mut tool_call_state,
1334 &mut server_tool_uses,
1335 &mut thinking_state,
1336 )
1337 .expect("text block start should produce a raw choice");
1338
1339 yield super::handle_event(
1340 &StreamingEvent::ContentBlockDelta {
1341 index: 2,
1342 delta: ContentDelta::TextDelta {
1343 text: "Claude Shannon was born on April 30, 1916.".to_string(),
1344 },
1345 },
1346 &mut tool_call_state,
1347 &mut server_tool_uses,
1348 &mut thinking_state,
1349 )
1350 .expect("text delta should produce a raw choice");
1351
1352 yield super::handle_event(
1353 &StreamingEvent::ContentBlockDelta {
1354 index: 2,
1355 delta: ContentDelta::CitationsDelta {
1356 citation: crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1357 cited_text: "Claude Shannon was born on April 30, 1916.".to_string(),
1358 url: "https://example.com/shannon".to_string(),
1359 title: Some("Claude Shannon".to_string()),
1360 encrypted_index: "encrypted-index".to_string(),
1361 },
1362 },
1363 },
1364 &mut tool_call_state,
1365 &mut server_tool_uses,
1366 &mut thinking_state,
1367 )
1368 .expect("citation delta should produce a raw choice");
1369
1370 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
1371 usage: PartialUsage::default(),
1372 }));
1373 };
1374
1375 let mut stream =
1376 crate::streaming::StreamingCompletionResponse::stream(to_stream_result(raw_stream));
1377 while stream.next().await.is_some() {}
1378
1379 let choice_items: Vec<crate::message::AssistantContent> =
1380 stream.choice.clone().into_iter().collect();
1381 assert_eq!(choice_items.len(), 3);
1382 assert!(
1383 choice_items
1384 .iter()
1385 .all(|item| !matches!(item, crate::message::AssistantContent::ToolCall(_))),
1386 "provider-owned web-search blocks must not become Rig client tool calls"
1387 );
1388
1389 let Some(crate::message::AssistantContent::Text(server_tool_use)) = choice_items.first()
1390 else {
1391 panic!("expected raw server_tool_use metadata");
1392 };
1393 assert_eq!(
1394 server_tool_use.additional_params.as_ref().unwrap()
1395 [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["type"],
1396 "server_tool_use"
1397 );
1398 assert_eq!(
1399 server_tool_use.additional_params.as_ref().unwrap()
1400 [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["input"]["query"],
1401 "claude shannon birth date"
1402 );
1403
1404 let Some(crate::message::AssistantContent::Text(web_search_result)) = choice_items.get(1)
1405 else {
1406 panic!("expected raw web_search_tool_result metadata");
1407 };
1408 assert_eq!(
1409 web_search_result.additional_params.as_ref().unwrap()
1410 [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["content"][0]
1411 ["encrypted_content"],
1412 "encrypted-content"
1413 );
1414
1415 let Some(crate::message::AssistantContent::Text(answer)) = choice_items.get(2) else {
1416 panic!("expected answer text");
1417 };
1418 assert_eq!(answer.text, "Claude Shannon was born on April 30, 1916.");
1419 let citations = crate::providers::anthropic::completion::anthropic_citations(answer)
1420 .expect("expected preserved citations");
1421 assert!(matches!(
1422 citations.first(),
1423 Some(crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1424 encrypted_index,
1425 ..
1426 }) if encrypted_index == "encrypted-index"
1427 ));
1428 }
1429
1430 #[test]
1431 fn test_handle_citations_delta_event_preserves_metadata() {
1432 let event = StreamingEvent::ContentBlockDelta {
1433 index: 0,
1434 delta: ContentDelta::CitationsDelta {
1435 citation: crate::providers::anthropic::completion::Citation::CharLocation {
1436 cited_text: "The grass is green.".to_string(),
1437 document_index: 0,
1438 document_title: Some("Example".to_string()),
1439 start_char_index: 0,
1440 end_char_index: 20,
1441 },
1442 },
1443 };
1444
1445 let mut tool_call_state = None;
1446 let mut thinking_state = None;
1447 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
1448
1449 assert!(result.is_some());
1450 let choice = result.unwrap().unwrap();
1451 let RawStreamingChoice::TextAdditionalParams(additional_params) = choice else {
1452 panic!("expected TextAdditionalParams choice");
1453 };
1454 assert_eq!(additional_params["citations"][0]["type"], "char_location");
1455 }
1456
1457 #[tokio::test]
1458 async fn test_streaming_citation_deltas_are_preserved_on_final_text() {
1459 let citation = crate::providers::anthropic::completion::Citation::CharLocation {
1460 cited_text: "The grass is green.".to_string(),
1461 document_index: 0,
1462 document_title: Some("Example".to_string()),
1463 start_char_index: 0,
1464 end_char_index: 20,
1465 };
1466
1467 let raw_stream = stream! {
1468 let mut tool_call_state = None;
1469 let mut thinking_state = None;
1470
1471 yield handle_event(
1472 &StreamingEvent::ContentBlockStart {
1473 index: 0,
1474 content_block: Content::Text {
1475 text: String::new(),
1476 citations: Vec::new(),
1477 cache_control: None,
1478 },
1479 },
1480 &mut tool_call_state,
1481 &mut thinking_state,
1482 )
1483 .expect("text block start should produce a raw choice");
1484
1485 yield handle_event(
1486 &StreamingEvent::ContentBlockDelta {
1487 index: 0,
1488 delta: ContentDelta::TextDelta {
1489 text: "the grass is green".to_string(),
1490 },
1491 },
1492 &mut tool_call_state,
1493 &mut thinking_state,
1494 )
1495 .expect("text delta should produce a raw choice");
1496
1497 yield handle_event(
1498 &StreamingEvent::ContentBlockDelta {
1499 index: 0,
1500 delta: ContentDelta::CitationsDelta {
1501 citation: crate::providers::anthropic::completion::Citation::CharLocation {
1502 cited_text: "The grass is green.".to_string(),
1503 document_index: 0,
1504 document_title: Some("Example".to_string()),
1505 start_char_index: 0,
1506 end_char_index: 20,
1507 },
1508 },
1509 },
1510 &mut tool_call_state,
1511 &mut thinking_state,
1512 )
1513 .expect("citation delta should produce a raw choice");
1514
1515 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
1516 usage: PartialUsage::default(),
1517 }));
1518 };
1519
1520 let mut stream =
1521 crate::streaming::StreamingCompletionResponse::stream(to_stream_result(raw_stream));
1522 while stream.next().await.is_some() {}
1523
1524 let choice_items: Vec<crate::message::AssistantContent> =
1525 stream.choice.clone().into_iter().collect();
1526 let Some(crate::message::AssistantContent::Text(text)) = choice_items.first() else {
1527 panic!("expected accumulated text item");
1528 };
1529
1530 assert_eq!(text.text, "the grass is green");
1531 let citations = crate::providers::anthropic::completion::anthropic_citations(text).unwrap();
1532 assert_eq!(citations, vec![citation]);
1533 }
1534
1535 #[test]
1536 fn test_unknown_content_delta_falls_back() {
1537 let json = r#"{"type": "something_new_from_anthropic", "field": "x"}"#;
1538 let delta: ContentDelta = serde_json::from_str(json).unwrap();
1539 assert!(matches!(delta, ContentDelta::Unknown));
1540 }
1541}