1use async_stream::stream;
2use futures::StreamExt;
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{Level, enabled, info_span};
6use tracing_futures::Instrument;
7
8use super::completion::{
9 AnthropicCompatibleProvider, Content, GenericCompletionModel, Message, SystemContent,
10 ToolChoice, Usage, apply_prompt_cache_control, build_tool_definitions,
11 resolve_top_level_cache_control, split_system_messages_from_history,
12};
13use crate::completion::{CompletionError, CompletionRequest, GetTokenUsage};
14use crate::http_client::sse::{Event, GenericEventSource};
15use crate::http_client::{self, HttpClientExt};
16use crate::json_utils::merge_inplace;
17use crate::message::ReasoningContent;
18use crate::streaming::{
19 self, RawStreamingChoice, RawStreamingToolCall, StreamingResult, ToolCallDeltaContent,
20};
21use crate::telemetry::SpanCombinator;
22use crate::wasm_compat::{WasmCompatSend, WasmCompatSync};
23use std::collections::HashMap;
24
25#[derive(Debug, Deserialize)]
26#[serde(tag = "type", rename_all = "snake_case")]
27pub enum StreamingEvent {
28 MessageStart {
29 message: MessageStart,
30 },
31 ContentBlockStart {
32 index: usize,
33 content_block: Content,
34 },
35 ContentBlockDelta {
36 index: usize,
37 delta: ContentDelta,
38 },
39 ContentBlockStop {
40 index: usize,
41 },
42 MessageDelta {
43 delta: MessageDelta,
44 usage: PartialUsage,
45 },
46 MessageStop,
47 Ping,
48 #[serde(other)]
49 Unknown,
50}
51
52#[derive(Debug, Deserialize)]
53pub struct MessageStart {
54 pub id: String,
55 pub role: String,
56 pub content: Vec<Content>,
57 pub model: String,
58 pub stop_reason: Option<String>,
59 pub stop_sequence: Option<String>,
60 pub usage: Usage,
61}
62
63#[derive(Debug, Deserialize)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum ContentDelta {
66 TextDelta {
67 text: String,
68 },
69 InputJsonDelta {
70 partial_json: String,
71 },
72 ThinkingDelta {
73 thinking: String,
74 },
75 SignatureDelta {
76 signature: String,
77 },
78 CitationsDelta {
79 citation: super::completion::Citation,
80 },
81 #[serde(other)]
85 Unknown,
86}
87
88#[derive(Debug, Deserialize)]
89pub struct MessageDelta {
90 pub stop_reason: Option<String>,
91 pub stop_sequence: Option<String>,
92}
93
94#[derive(Debug, Deserialize, Clone, Serialize, Default)]
95pub struct PartialUsage {
96 pub output_tokens: usize,
97 #[serde(default)]
98 pub input_tokens: Option<usize>,
99 #[serde(default)]
100 pub cache_creation_input_tokens: Option<u64>,
101 #[serde(default)]
102 pub cache_read_input_tokens: Option<u64>,
103}
104
105impl GetTokenUsage for PartialUsage {
106 fn token_usage(&self) -> Option<crate::completion::Usage> {
107 let mut usage = crate::completion::Usage::new();
108
109 usage.input_tokens = self.input_tokens.unwrap_or_default() as u64;
110 usage.output_tokens = self.output_tokens as u64;
111 usage.cached_input_tokens = self.cache_read_input_tokens.unwrap_or(0);
112 usage.cache_creation_input_tokens = self.cache_creation_input_tokens.unwrap_or(0);
113 usage.total_tokens = usage.input_tokens
114 + usage.cached_input_tokens
115 + usage.cache_creation_input_tokens
116 + usage.output_tokens;
117 Some(usage)
118 }
119}
120
121#[derive(Default)]
122struct ToolCallState {
123 name: String,
124 id: String,
125 internal_call_id: String,
126 input_json: String,
127}
128
129struct ServerToolUseState {
130 name: String,
131 id: String,
132 initial_input: Value,
133 input_json: String,
134}
135
136#[derive(Default)]
137struct ThinkingState {
138 thinking: String,
139 signature: String,
140}
141
142#[derive(Clone, Debug, Deserialize, Serialize)]
143pub struct StreamingCompletionResponse {
144 pub usage: PartialUsage,
145}
146
147impl GetTokenUsage for StreamingCompletionResponse {
148 fn token_usage(&self) -> Option<crate::completion::Usage> {
149 let mut usage = crate::completion::Usage::new();
150 usage.input_tokens = self.usage.input_tokens.unwrap_or(0) as u64;
151 usage.output_tokens = self.usage.output_tokens as u64;
152 usage.cached_input_tokens = self.usage.cache_read_input_tokens.unwrap_or(0);
153 usage.cache_creation_input_tokens = self.usage.cache_creation_input_tokens.unwrap_or(0);
154 usage.total_tokens = usage.input_tokens
155 + usage.cached_input_tokens
156 + usage.cache_creation_input_tokens
157 + usage.output_tokens;
158
159 Some(usage)
160 }
161}
162
163impl<Ext, T> GenericCompletionModel<Ext, T>
164where
165 T: HttpClientExt + Clone + Default + 'static,
166 Ext: AnthropicCompatibleProvider + Clone + WasmCompatSend + WasmCompatSync + 'static,
167{
168 pub(crate) async fn stream(
169 &self,
170 mut completion_request: CompletionRequest,
171 ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
172 {
173 let request_model = completion_request
174 .model
175 .clone()
176 .unwrap_or_else(|| self.model.clone());
177 let span = if tracing::Span::current().is_disabled() {
178 info_span!(
179 target: "rig::completions",
180 "chat_streaming",
181 gen_ai.operation.name = "chat_streaming",
182 gen_ai.provider.name = Ext::PROVIDER_NAME,
183 gen_ai.request.model = &request_model,
184 gen_ai.system_instructions = &completion_request.preamble,
185 gen_ai.response.id = tracing::field::Empty,
186 gen_ai.response.model = &request_model,
187 gen_ai.usage.output_tokens = tracing::field::Empty,
188 gen_ai.usage.input_tokens = tracing::field::Empty,
189 gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
190 gen_ai.usage.cache_creation.input_tokens = tracing::field::Empty,
191 gen_ai.input.messages = tracing::field::Empty,
192 gen_ai.output.messages = tracing::field::Empty,
193 )
194 } else {
195 tracing::Span::current()
196 };
197 let max_tokens = if let Some(tokens) = completion_request.max_tokens {
198 tokens
199 } else if let Some(tokens) = self.default_max_tokens {
200 tokens
201 } else {
202 return Err(CompletionError::RequestError(
203 "`max_tokens` must be set for Anthropic".into(),
204 ));
205 };
206
207 let mut full_history = vec![];
208 if let Some(docs) = completion_request.normalized_documents() {
209 full_history.push(docs);
210 }
211 full_history.extend(completion_request.chat_history);
212 let (history_system, full_history) = split_system_messages_from_history(full_history);
213
214 let mut messages = full_history
215 .into_iter()
216 .map(Message::try_from)
217 .collect::<Result<Vec<Message>, _>>()?;
218
219 let mut system: Vec<SystemContent> =
221 if let Some(preamble) = completion_request.preamble.as_ref() {
222 if preamble.is_empty() {
223 vec![]
224 } else {
225 vec![SystemContent::Text {
226 text: preamble.clone(),
227 cache_control: None,
228 }]
229 }
230 } else {
231 vec![]
232 };
233 system.extend(history_system);
234
235 let mut additional_params_payload = completion_request
236 .additional_params
237 .take()
238 .unwrap_or(Value::Null);
239 let top_level_cache_control = resolve_top_level_cache_control(
240 self.automatic_caching,
241 self.automatic_caching_ttl.clone(),
242 &mut additional_params_payload,
243 )?;
244 let mut tools =
245 build_tool_definitions(completion_request.tools, &mut additional_params_payload)?;
246
247 apply_prompt_cache_control(
248 &mut system,
249 &mut messages,
250 &mut tools,
251 self.prompt_caching,
252 top_level_cache_control.as_ref(),
253 )?;
254
255 let mut body = json!({
256 "model": request_model,
257 "messages": messages,
258 "max_tokens": max_tokens,
259 "stream": true,
260 });
261
262 if let Some(cache_control) = top_level_cache_control {
265 merge_inplace(
266 &mut body,
267 json!({ "cache_control": serde_json::to_value(&cache_control)? }),
268 );
269 }
270
271 if !system.is_empty() {
273 merge_inplace(&mut body, json!({ "system": system }));
274 }
275
276 if let Some(temperature) = completion_request.temperature {
277 merge_inplace(&mut body, json!({ "temperature": temperature }));
278 }
279
280 if !tools.is_empty() {
281 merge_inplace(
282 &mut body,
283 json!({
284 "tools": tools,
285 "tool_choice": ToolChoice::Auto,
286 }),
287 );
288 }
289
290 if !additional_params_payload.is_null() {
291 merge_inplace(&mut body, additional_params_payload)
292 }
293
294 if enabled!(Level::TRACE) {
295 tracing::trace!(
296 target: "rig::completions",
297 "Anthropic completion request: {}",
298 serde_json::to_string_pretty(&body)?
299 );
300 }
301
302 let body: Vec<u8> = serde_json::to_vec(&body)?;
303
304 let req = self
305 .client
306 .post("/v1/messages")?
307 .body(body)
308 .map_err(http_client::Error::Protocol)?;
309
310 let stream = GenericEventSource::new(self.client.clone(), req);
311
312 let stream: StreamingResult<StreamingCompletionResponse> = Box::pin(stream! {
314 let mut current_tool_call: Option<ToolCallState> = None;
315 let mut server_tool_uses: HashMap<usize, ServerToolUseState> = HashMap::new();
316 let mut current_thinking: Option<ThinkingState> = None;
317 let mut sse_stream = Box::pin(stream);
318 let mut input_tokens = 0;
319 let mut final_usage = None;
320
321 let mut text_content = String::new();
322
323 while let Some(sse_result) = sse_stream.next().await {
324 match sse_result {
325 Ok(Event::Open) => {}
326 Ok(Event::Message(sse)) => {
327 match serde_json::from_str::<StreamingEvent>(&sse.data) {
329 Ok(event) => {
330 match &event {
331 StreamingEvent::MessageStart { message } => {
332 input_tokens = message.usage.input_tokens;
333
334 let span = tracing::Span::current();
335 span.record("gen_ai.response.id", &message.id);
336 span.record("gen_ai.response.model", &message.model);
337 },
338 StreamingEvent::MessageDelta { delta, usage } => {
339 if delta.stop_reason.is_some() {
340 let usage = PartialUsage {
344 output_tokens: usage.output_tokens,
345 input_tokens: usize::try_from(input_tokens).ok(),
346 cache_creation_input_tokens: usage.cache_creation_input_tokens,
347 cache_read_input_tokens: usage.cache_read_input_tokens
348 };
349
350 let span = tracing::Span::current();
351 span.record_token_usage(&usage);
352 final_usage = Some(usage);
353 break;
354 }
355 }
356 _ => {}
357 }
358
359 if let Some(result) = handle_event(
360 &event,
361 &mut current_tool_call,
362 &mut server_tool_uses,
363 &mut current_thinking,
364 ) {
365 if let Ok(RawStreamingChoice::Message(ref text)) = result {
366 text_content += text;
367 }
368 yield result;
369 }
370 },
371 Err(e) => {
372 if !sse.data.trim().is_empty() {
373 yield Err(CompletionError::ResponseError(
374 format!("Failed to parse JSON: {} (Data: {})", e, sse.data)
375 ));
376 }
377 }
378 }
379 },
380 Err(e) => {
381 yield Err(CompletionError::ProviderError(format!("SSE Error: {e}")));
382 break;
383 }
384 }
385 }
386
387 sse_stream.close();
389
390 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
391 usage: final_usage.unwrap_or_default()
392 }))
393 }.instrument(span));
394
395 Ok(streaming::StreamingCompletionResponse::stream(stream))
396 }
397}
398
399fn handle_event(
400 event: &StreamingEvent,
401 current_tool_call: &mut Option<ToolCallState>,
402 server_tool_uses: &mut HashMap<usize, ServerToolUseState>,
403 current_thinking: &mut Option<ThinkingState>,
404) -> Option<Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>> {
405 match event {
406 StreamingEvent::ContentBlockDelta { index, delta } => match delta {
407 ContentDelta::TextDelta { text } => {
408 if current_tool_call.is_none() {
409 return Some(Ok(RawStreamingChoice::Message(text.clone())));
410 }
411 None
412 }
413 ContentDelta::InputJsonDelta { partial_json } => {
414 if let Some(server_tool_use) = server_tool_uses.get_mut(index) {
415 server_tool_use.input_json.push_str(partial_json);
416 return None;
417 }
418
419 if let Some(tool_call) = current_tool_call {
420 tool_call.input_json.push_str(partial_json);
421 return Some(Ok(RawStreamingChoice::ToolCallDelta {
423 id: tool_call.id.clone(),
424 internal_call_id: tool_call.internal_call_id.clone(),
425 content: ToolCallDeltaContent::Delta(partial_json.clone()),
426 }));
427 }
428 None
429 }
430 ContentDelta::ThinkingDelta { thinking } => {
431 current_thinking
432 .get_or_insert_with(ThinkingState::default)
433 .thinking
434 .push_str(thinking);
435
436 Some(Ok(RawStreamingChoice::ReasoningDelta {
437 id: None,
438 reasoning: thinking.clone(),
439 }))
440 }
441 ContentDelta::SignatureDelta { signature } => {
442 current_thinking
443 .get_or_insert_with(ThinkingState::default)
444 .signature
445 .push_str(signature);
446
447 None
449 }
450 ContentDelta::CitationsDelta { citation } => {
451 Some(Ok(RawStreamingChoice::TextAdditionalParams(json!({
452 "citations": [citation]
453 }))))
454 }
455 ContentDelta::Unknown => None,
456 },
457 StreamingEvent::ContentBlockStart {
458 index,
459 content_block,
460 } => match content_block {
461 Content::Text { citations, .. } => {
462 let additional_params = (!citations.is_empty()).then(|| {
463 json!({
464 "citations": citations
465 })
466 });
467 Some(Ok(RawStreamingChoice::TextStart { additional_params }))
468 }
469 Content::ServerToolUse { id, name, input } => {
470 server_tool_uses.insert(
471 *index,
472 ServerToolUseState {
473 name: name.clone(),
474 id: id.clone(),
475 initial_input: input.clone(),
476 input_json: String::new(),
477 },
478 );
479 None
480 }
481 raw @ Content::WebSearchToolResult { .. } => Some(Ok(RawStreamingChoice::TextStart {
482 additional_params: Some(json!({
483 super::completion::ANTHROPIC_RAW_CONTENT_KEY: raw
484 })),
485 })),
486 Content::ToolUse { id, name, .. } => {
487 let internal_call_id = nanoid::nanoid!();
488 *current_tool_call = Some(ToolCallState {
489 name: name.clone(),
490 id: id.clone(),
491 internal_call_id: internal_call_id.clone(),
492 input_json: String::new(),
493 });
494 Some(Ok(RawStreamingChoice::ToolCallDelta {
495 id: id.clone(),
496 internal_call_id,
497 content: ToolCallDeltaContent::Name(name.clone()),
498 }))
499 }
500 Content::Thinking { .. } => {
501 *current_thinking = Some(ThinkingState::default());
502 None
503 }
504 Content::RedactedThinking { data } => Some(Ok(RawStreamingChoice::Reasoning {
505 id: None,
506 content: ReasoningContent::Redacted { data: data.clone() },
507 })),
508 _ => None,
510 },
511 StreamingEvent::ContentBlockStop { index } => {
512 if let Some(thinking_state) = Option::take(current_thinking)
513 && !thinking_state.thinking.is_empty()
514 {
515 let signature = if thinking_state.signature.is_empty() {
516 None
517 } else {
518 Some(thinking_state.signature)
519 };
520
521 return Some(Ok(RawStreamingChoice::Reasoning {
522 id: None,
523 content: ReasoningContent::Text {
524 text: thinking_state.thinking,
525 signature,
526 },
527 }));
528 }
529
530 if let Some(server_tool_use) = server_tool_uses.remove(index) {
531 let input = if server_tool_use.input_json.is_empty() {
532 if server_tool_use.initial_input.is_null() {
533 json!({})
534 } else {
535 server_tool_use.initial_input
536 }
537 } else {
538 match serde_json::from_str(&server_tool_use.input_json) {
539 Ok(json_value) => json_value,
540 Err(e) => return Some(Err(CompletionError::from(e))),
541 }
542 };
543
544 return Some(Ok(RawStreamingChoice::TextStart {
545 additional_params: Some(json!({
546 super::completion::ANTHROPIC_RAW_CONTENT_KEY: Content::ServerToolUse {
547 id: server_tool_use.id,
548 name: server_tool_use.name,
549 input,
550 }
551 })),
552 }));
553 }
554
555 if let Some(tool_call) = Option::take(current_tool_call) {
556 let json_str = if tool_call.input_json.is_empty() {
557 "{}"
558 } else {
559 &tool_call.input_json
560 };
561 match serde_json::from_str(json_str) {
562 Ok(json_value) => {
563 let raw_tool_call =
564 RawStreamingToolCall::new(tool_call.id, tool_call.name, json_value)
565 .with_internal_call_id(tool_call.internal_call_id);
566 Some(Ok(RawStreamingChoice::ToolCall(raw_tool_call)))
567 }
568 Err(e) => Some(Err(CompletionError::from(e))),
569 }
570 } else {
571 None
572 }
573 }
574 StreamingEvent::MessageStart { .. }
576 | StreamingEvent::MessageDelta { .. }
577 | StreamingEvent::MessageStop
578 | StreamingEvent::Ping
579 | StreamingEvent::Unknown => None,
580 }
581}
582
583#[cfg(test)]
584mod tests {
585 use super::super::completion::{CacheControl, CacheTtl};
586 use super::*;
587 use async_stream::stream;
588 use futures::StreamExt;
589
590 #[cfg(not(all(feature = "wasm", target_arch = "wasm32")))]
591 fn to_stream_result(
592 stream: impl futures::Stream<
593 Item = Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>,
594 > + Send
595 + 'static,
596 ) -> crate::streaming::StreamingResult<StreamingCompletionResponse> {
597 Box::pin(stream)
598 }
599
600 #[cfg(all(feature = "wasm", target_arch = "wasm32"))]
601 fn to_stream_result(
602 stream: impl futures::Stream<
603 Item = Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>,
604 > + 'static,
605 ) -> crate::streaming::StreamingResult<StreamingCompletionResponse> {
606 Box::pin(stream)
607 }
608
609 #[test]
610 fn test_streaming_tool_build_marks_final_combined_tool() {
611 let mut additional_params = json!({
612 "tools": [{
613 "name": "provider_tool",
614 "description": "Provider tool",
615 "input_schema": {"type": "object"}
616 }]
617 });
618
619 let mut tools = build_tool_definitions(
620 vec![crate::completion::ToolDefinition {
621 name: "rig_tool".to_string(),
622 description: "Rig tool".to_string(),
623 parameters: json!({"type": "object", "properties": {}}),
624 }],
625 &mut additional_params,
626 )
627 .unwrap();
628 let mut system: Vec<SystemContent> = Vec::new();
629 let mut messages: Vec<Message> = Vec::new();
630 apply_prompt_cache_control(&mut system, &mut messages, &mut tools, true, None).unwrap();
631
632 assert_eq!(tools.len(), 2);
633 assert!(tools[0].get("cache_control").is_none());
634 assert_eq!(tools[1]["name"], "provider_tool");
635 assert_eq!(tools[1]["cache_control"]["type"], "ephemeral");
636 }
637
638 #[test]
639 fn test_streaming_prompt_cache_control_uses_raw_top_level_ttl() {
640 let mut additional_params = json!({
641 "cache_control": {"type": "ephemeral", "ttl": "1h"}
642 });
643 let top_level_cache_control =
644 resolve_top_level_cache_control(false, None, &mut additional_params).unwrap();
645 let mut tools = build_tool_definitions(
646 vec![crate::completion::ToolDefinition {
647 name: "rig_tool".to_string(),
648 description: "Rig tool".to_string(),
649 parameters: json!({"type": "object", "properties": {}}),
650 }],
651 &mut additional_params,
652 )
653 .unwrap();
654 let mut system = vec![SystemContent::Text {
655 text: "System prompt".to_string(),
656 cache_control: None,
657 }];
658 let mut messages: Vec<Message> = Vec::new();
659
660 apply_prompt_cache_control(
661 &mut system,
662 &mut messages,
663 &mut tools,
664 true,
665 top_level_cache_control.as_ref(),
666 )
667 .unwrap();
668
669 assert_eq!(tools[0]["cache_control"]["type"], "ephemeral");
670 assert_eq!(tools[0]["cache_control"]["ttl"], "1h");
671 match &system[0] {
672 SystemContent::Text {
673 cache_control: Some(CacheControl::Ephemeral { ttl }),
674 ..
675 } => assert_eq!(ttl.as_ref(), Some(&CacheTtl::OneHour)),
676 other => panic!("expected system cache_control, got {other:?}"),
677 }
678 assert!(additional_params.get("cache_control").is_none());
679 }
680
681 fn handle_event(
682 event: &StreamingEvent,
683 current_tool_call: &mut Option<ToolCallState>,
684 current_thinking: &mut Option<ThinkingState>,
685 ) -> Option<Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>> {
686 let mut server_tool_uses = HashMap::new();
687 super::handle_event(
688 event,
689 current_tool_call,
690 &mut server_tool_uses,
691 current_thinking,
692 )
693 }
694
695 #[test]
696 fn test_thinking_delta_deserialization() {
697 let json = r#"{"type": "thinking_delta", "thinking": "Let me think about this..."}"#;
698 let delta: ContentDelta = serde_json::from_str(json).unwrap();
699
700 match delta {
701 ContentDelta::ThinkingDelta { thinking } => {
702 assert_eq!(thinking, "Let me think about this...");
703 }
704 _ => panic!("Expected ThinkingDelta variant"),
705 }
706 }
707
708 #[test]
709 fn test_signature_delta_deserialization() {
710 let json = r#"{"type": "signature_delta", "signature": "abc123def456"}"#;
711 let delta: ContentDelta = serde_json::from_str(json).unwrap();
712
713 match delta {
714 ContentDelta::SignatureDelta { signature } => {
715 assert_eq!(signature, "abc123def456");
716 }
717 _ => panic!("Expected SignatureDelta variant"),
718 }
719 }
720
721 #[test]
722 fn test_thinking_delta_streaming_event_deserialization() {
723 let json = r#"{
724 "type": "content_block_delta",
725 "index": 0,
726 "delta": {
727 "type": "thinking_delta",
728 "thinking": "First, I need to understand the problem."
729 }
730 }"#;
731
732 let event: StreamingEvent = serde_json::from_str(json).unwrap();
733
734 match event {
735 StreamingEvent::ContentBlockDelta { index, delta } => {
736 assert_eq!(index, 0);
737 match delta {
738 ContentDelta::ThinkingDelta { thinking } => {
739 assert_eq!(thinking, "First, I need to understand the problem.");
740 }
741 _ => panic!("Expected ThinkingDelta"),
742 }
743 }
744 _ => panic!("Expected ContentBlockDelta event"),
745 }
746 }
747
748 #[test]
749 fn test_signature_delta_streaming_event_deserialization() {
750 let json = r#"{
751 "type": "content_block_delta",
752 "index": 0,
753 "delta": {
754 "type": "signature_delta",
755 "signature": "ErUBCkYICBgCIkCaGbqC85F4"
756 }
757 }"#;
758
759 let event: StreamingEvent = serde_json::from_str(json).unwrap();
760
761 match event {
762 StreamingEvent::ContentBlockDelta { index, delta } => {
763 assert_eq!(index, 0);
764 match delta {
765 ContentDelta::SignatureDelta { signature } => {
766 assert_eq!(signature, "ErUBCkYICBgCIkCaGbqC85F4");
767 }
768 _ => panic!("Expected SignatureDelta"),
769 }
770 }
771 _ => panic!("Expected ContentBlockDelta event"),
772 }
773 }
774
775 #[test]
776 fn test_handle_thinking_delta_event() {
777 let event = StreamingEvent::ContentBlockDelta {
778 index: 0,
779 delta: ContentDelta::ThinkingDelta {
780 thinking: "Analyzing the request...".to_string(),
781 },
782 };
783
784 let mut tool_call_state = None;
785 let mut thinking_state = None;
786 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
787
788 assert!(result.is_some());
789 let choice = result.unwrap().unwrap();
790
791 match choice {
792 RawStreamingChoice::ReasoningDelta { id, reasoning, .. } => {
793 assert_eq!(id, None);
794 assert_eq!(reasoning, "Analyzing the request...");
795 }
796 _ => panic!("Expected ReasoningDelta choice"),
797 }
798
799 assert!(thinking_state.is_some());
801 assert_eq!(thinking_state.unwrap().thinking, "Analyzing the request...");
802 }
803
804 #[test]
805 fn test_handle_signature_delta_event() {
806 let event = StreamingEvent::ContentBlockDelta {
807 index: 0,
808 delta: ContentDelta::SignatureDelta {
809 signature: "test_signature".to_string(),
810 },
811 };
812
813 let mut tool_call_state = None;
814 let mut thinking_state = None;
815 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
816
817 assert!(result.is_none());
819
820 assert!(thinking_state.is_some());
822 assert_eq!(thinking_state.unwrap().signature, "test_signature");
823 }
824
825 #[test]
826 fn test_handle_redacted_thinking_content_block_start_event() {
827 let event = StreamingEvent::ContentBlockStart {
828 index: 0,
829 content_block: Content::RedactedThinking {
830 data: "redacted_blob".to_string(),
831 },
832 };
833 let mut tool_call_state = None;
834 let mut thinking_state = None;
835 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
836
837 assert!(result.is_some());
838 match result.unwrap().unwrap() {
839 RawStreamingChoice::Reasoning {
840 content: ReasoningContent::Redacted { data },
841 ..
842 } => {
843 assert_eq!(data, "redacted_blob");
844 }
845 _ => panic!("Expected Redacted reasoning chunk"),
846 }
847 }
848
849 #[test]
850 fn test_handle_text_delta_event() {
851 let event = StreamingEvent::ContentBlockDelta {
852 index: 0,
853 delta: ContentDelta::TextDelta {
854 text: "Hello, world!".to_string(),
855 },
856 };
857
858 let mut tool_call_state = None;
859 let mut thinking_state = None;
860 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
861
862 assert!(result.is_some());
863 let choice = result.unwrap().unwrap();
864
865 match choice {
866 RawStreamingChoice::Message(text) => {
867 assert_eq!(text, "Hello, world!");
868 }
869 _ => panic!("Expected Message choice"),
870 }
871 }
872
873 #[test]
874 fn test_handle_text_block_start_event() {
875 let event = StreamingEvent::ContentBlockStart {
876 index: 0,
877 content_block: Content::Text {
878 text: String::new(),
879 citations: Vec::new(),
880 cache_control: None,
881 },
882 };
883
884 let mut tool_call_state = None;
885 let mut thinking_state = None;
886 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
887
888 assert!(result.is_some());
889 let choice = result.unwrap().unwrap();
890 assert!(matches!(
891 choice,
892 RawStreamingChoice::TextStart {
893 additional_params: None
894 }
895 ));
896 }
897
898 #[test]
899 fn test_thinking_delta_does_not_interfere_with_tool_calls() {
900 let event = StreamingEvent::ContentBlockDelta {
902 index: 0,
903 delta: ContentDelta::ThinkingDelta {
904 thinking: "Thinking while tool is active...".to_string(),
905 },
906 };
907
908 let mut tool_call_state = Some(ToolCallState {
909 name: "test_tool".to_string(),
910 id: "tool_123".to_string(),
911 internal_call_id: nanoid::nanoid!(),
912 input_json: String::new(),
913 });
914 let mut thinking_state = None;
915
916 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
917
918 assert!(result.is_some());
919 let choice = result.unwrap().unwrap();
920
921 match choice {
922 RawStreamingChoice::ReasoningDelta { reasoning, .. } => {
923 assert_eq!(reasoning, "Thinking while tool is active...");
924 }
925 _ => panic!("Expected ReasoningDelta choice"),
926 }
927
928 assert!(tool_call_state.is_some());
930 }
931
932 #[test]
933 fn test_handle_input_json_delta_event() {
934 let event = StreamingEvent::ContentBlockDelta {
935 index: 0,
936 delta: ContentDelta::InputJsonDelta {
937 partial_json: "{\"arg\":\"value".to_string(),
938 },
939 };
940
941 let mut tool_call_state = Some(ToolCallState {
942 name: "test_tool".to_string(),
943 id: "tool_123".to_string(),
944 internal_call_id: nanoid::nanoid!(),
945 input_json: String::new(),
946 });
947 let mut thinking_state = None;
948
949 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
950
951 assert!(result.is_some());
953 let choice = result.unwrap().unwrap();
954
955 match choice {
956 RawStreamingChoice::ToolCallDelta {
957 id,
958 internal_call_id: _,
959 content,
960 } => {
961 assert_eq!(id, "tool_123");
962 match content {
963 ToolCallDeltaContent::Delta(delta) => assert_eq!(delta, "{\"arg\":\"value"),
964 _ => panic!("Expected Delta content"),
965 }
966 }
967 _ => panic!("Expected ToolCallDelta choice, got {:?}", choice),
968 }
969
970 assert!(tool_call_state.is_some());
972 let state = tool_call_state.unwrap();
973 assert_eq!(state.input_json, "{\"arg\":\"value");
974 }
975
976 #[test]
977 fn test_tool_call_accumulation_with_multiple_deltas() {
978 let mut tool_call_state = Some(ToolCallState {
979 name: "test_tool".to_string(),
980 id: "tool_123".to_string(),
981 internal_call_id: nanoid::nanoid!(),
982 input_json: String::new(),
983 });
984 let mut thinking_state = None;
985
986 let event1 = StreamingEvent::ContentBlockDelta {
988 index: 0,
989 delta: ContentDelta::InputJsonDelta {
990 partial_json: "{\"location\":".to_string(),
991 },
992 };
993 let result1 = handle_event(&event1, &mut tool_call_state, &mut thinking_state);
994 assert!(result1.is_some());
995
996 let event2 = StreamingEvent::ContentBlockDelta {
998 index: 0,
999 delta: ContentDelta::InputJsonDelta {
1000 partial_json: "\"Paris\",".to_string(),
1001 },
1002 };
1003 let result2 = handle_event(&event2, &mut tool_call_state, &mut thinking_state);
1004 assert!(result2.is_some());
1005
1006 let event3 = StreamingEvent::ContentBlockDelta {
1008 index: 0,
1009 delta: ContentDelta::InputJsonDelta {
1010 partial_json: "\"temp\":\"20C\"}".to_string(),
1011 },
1012 };
1013 let result3 = handle_event(&event3, &mut tool_call_state, &mut thinking_state);
1014 assert!(result3.is_some());
1015
1016 assert!(tool_call_state.is_some());
1018 let state = tool_call_state.as_ref().unwrap();
1019 assert_eq!(
1020 state.input_json,
1021 "{\"location\":\"Paris\",\"temp\":\"20C\"}"
1022 );
1023
1024 let stop_event = StreamingEvent::ContentBlockStop { index: 0 };
1026 let final_result = handle_event(&stop_event, &mut tool_call_state, &mut thinking_state);
1027 assert!(final_result.is_some());
1028
1029 match final_result.unwrap().unwrap() {
1030 RawStreamingChoice::ToolCall(RawStreamingToolCall {
1031 id,
1032 name,
1033 arguments,
1034 ..
1035 }) => {
1036 assert_eq!(id, "tool_123");
1037 assert_eq!(name, "test_tool");
1038 assert_eq!(
1039 arguments.get("location").unwrap().as_str().unwrap(),
1040 "Paris"
1041 );
1042 assert_eq!(arguments.get("temp").unwrap().as_str().unwrap(), "20C");
1043 }
1044 other => panic!("Expected ToolCall, got {:?}", other),
1045 }
1046
1047 assert!(tool_call_state.is_none());
1049 }
1050
1051 #[test]
1052 fn test_citations_delta_streaming_event_deserialization() {
1053 let json = r#"{
1054 "type": "content_block_delta",
1055 "index": 0,
1056 "delta": {
1057 "type": "citations_delta",
1058 "citation": {
1059 "type": "char_location",
1060 "cited_text": "The grass is green.",
1061 "document_index": 0,
1062 "document_title": "Example",
1063 "start_char_index": 0,
1064 "end_char_index": 20
1065 }
1066 }
1067 }"#;
1068
1069 let event: StreamingEvent = serde_json::from_str(json).unwrap();
1070 let StreamingEvent::ContentBlockDelta { index, delta } = event else {
1071 panic!("expected ContentBlockDelta");
1072 };
1073 assert_eq!(index, 0);
1074 let ContentDelta::CitationsDelta { citation } = delta else {
1075 panic!("expected CitationsDelta");
1076 };
1077 let crate::providers::anthropic::completion::Citation::CharLocation {
1078 start_char_index,
1079 end_char_index,
1080 ..
1081 } = citation
1082 else {
1083 panic!("expected CharLocation");
1084 };
1085 assert_eq!(start_char_index, 0);
1086 assert_eq!(end_char_index, 20);
1087 }
1088
1089 #[test]
1090 fn test_search_result_citations_delta_streaming_event_deserialization() {
1091 let json = r#"{
1092 "type": "content_block_delta",
1093 "index": 0,
1094 "delta": {
1095 "type": "citations_delta",
1096 "citation": {
1097 "type": "search_result_location",
1098 "cited_text": "API requests require a key.",
1099 "source": "https://docs.example.com/api-reference",
1100 "title": "API Reference",
1101 "search_result_index": 0,
1102 "start_block_index": 0,
1103 "end_block_index": 1
1104 }
1105 }
1106 }"#;
1107
1108 let event: StreamingEvent = serde_json::from_str(json).unwrap();
1109 let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1110 panic!("expected ContentBlockDelta");
1111 };
1112 let ContentDelta::CitationsDelta { citation } = delta else {
1113 panic!("expected CitationsDelta");
1114 };
1115 assert!(matches!(
1116 citation,
1117 crate::providers::anthropic::completion::Citation::SearchResultLocation {
1118 search_result_index: 0,
1119 start_block_index: 0,
1120 end_block_index: 1,
1121 ..
1122 }
1123 ));
1124 }
1125
1126 #[test]
1127 fn test_web_search_result_citations_delta_streaming_event_deserialization() {
1128 let json = r#"{
1129 "type": "content_block_delta",
1130 "index": 0,
1131 "delta": {
1132 "type": "citations_delta",
1133 "citation": {
1134 "type": "web_search_result_location",
1135 "cited_text": "Claude Shannon was a mathematician.",
1136 "url": "https://example.com/shannon",
1137 "title": "Claude Shannon",
1138 "encrypted_index": "encrypted-reference"
1139 }
1140 }
1141 }"#;
1142
1143 let event: StreamingEvent = serde_json::from_str(json).unwrap();
1144 let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1145 panic!("expected ContentBlockDelta");
1146 };
1147 let ContentDelta::CitationsDelta { citation } = delta else {
1148 panic!("expected CitationsDelta");
1149 };
1150 assert!(matches!(
1151 citation,
1152 crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1153 ref url,
1154 ref encrypted_index,
1155 ..
1156 } if url == "https://example.com/shannon"
1157 && encrypted_index == "encrypted-reference"
1158 ));
1159 }
1160
1161 #[test]
1162 fn test_web_search_result_citations_delta_allows_null_title() {
1163 let json = r#"{
1164 "type": "content_block_delta",
1165 "index": 0,
1166 "delta": {
1167 "type": "citations_delta",
1168 "citation": {
1169 "type": "web_search_result_location",
1170 "cited_text": "Claude Shannon was a mathematician.",
1171 "url": "https://example.com/shannon",
1172 "title": null,
1173 "encrypted_index": "encrypted-reference"
1174 }
1175 }
1176 }"#;
1177
1178 let event: StreamingEvent = serde_json::from_str(json).unwrap();
1179 let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1180 panic!("expected ContentBlockDelta");
1181 };
1182 let ContentDelta::CitationsDelta { citation } = delta else {
1183 panic!("expected CitationsDelta");
1184 };
1185 assert!(matches!(
1186 citation,
1187 crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1188 title: None,
1189 ..
1190 }
1191 ));
1192 }
1193
1194 #[test]
1195 fn test_web_search_content_block_start_events_deserialize() {
1196 let server_tool_use = r#"{
1197 "type": "content_block_start",
1198 "index": 1,
1199 "content_block": {
1200 "type": "server_tool_use",
1201 "id": "srvtoolu_01",
1202 "name": "web_search",
1203 "input": {
1204 "query": "claude shannon birth date"
1205 }
1206 }
1207 }"#;
1208 let event: StreamingEvent = serde_json::from_str(server_tool_use).unwrap();
1209 assert!(matches!(
1210 event,
1211 StreamingEvent::ContentBlockStart {
1212 content_block: Content::ServerToolUse {
1213 ref id,
1214 ref name,
1215 ref input
1216 },
1217 ..
1218 } if id == "srvtoolu_01"
1219 && name == "web_search"
1220 && input["query"] == "claude shannon birth date"
1221 ));
1222
1223 let web_search_tool_result = r#"{
1224 "type": "content_block_start",
1225 "index": 2,
1226 "content_block": {
1227 "type": "web_search_tool_result",
1228 "tool_use_id": "srvtoolu_01",
1229 "content": [{
1230 "type": "web_search_result",
1231 "url": "https://example.com/shannon",
1232 "title": "Claude Shannon",
1233 "encrypted_content": "encrypted-content"
1234 }]
1235 }
1236 }"#;
1237 let event: StreamingEvent = serde_json::from_str(web_search_tool_result).unwrap();
1238 assert!(matches!(
1239 event,
1240 StreamingEvent::ContentBlockStart {
1241 content_block: Content::WebSearchToolResult {
1242 ref tool_use_id,
1243 ref content
1244 },
1245 ..
1246 } if tool_use_id == "srvtoolu_01"
1247 && content[0]["encrypted_content"] == "encrypted-content"
1248 ));
1249 }
1250
1251 #[tokio::test]
1252 async fn test_streaming_web_search_blocks_are_preserved_on_final_choice() {
1253 let raw_stream = stream! {
1254 let mut tool_call_state = None;
1255 let mut server_tool_uses = HashMap::new();
1256 let mut thinking_state = None;
1257
1258 let server_tool_use_start = super::handle_event(
1259 &StreamingEvent::ContentBlockStart {
1260 index: 0,
1261 content_block: Content::ServerToolUse {
1262 id: "srvtoolu_01".to_string(),
1263 name: "web_search".to_string(),
1264 input: serde_json::Value::Null,
1265 },
1266 },
1267 &mut tool_call_state,
1268 &mut server_tool_uses,
1269 &mut thinking_state,
1270 );
1271 assert!(
1272 server_tool_use_start.is_none(),
1273 "server_tool_use start should be accumulated until its input JSON is complete"
1274 );
1275
1276 let server_tool_use_delta = super::handle_event(
1277 &StreamingEvent::ContentBlockDelta {
1278 index: 0,
1279 delta: ContentDelta::InputJsonDelta {
1280 partial_json: r#"{"query":"claude shannon birth date"}"#.to_string(),
1281 },
1282 },
1283 &mut tool_call_state,
1284 &mut server_tool_uses,
1285 &mut thinking_state,
1286 );
1287 assert!(
1288 server_tool_use_delta.is_none(),
1289 "server_tool_use input JSON should not be emitted as a Rig tool-call delta"
1290 );
1291
1292 yield super::handle_event(
1293 &StreamingEvent::ContentBlockStop { index: 0 },
1294 &mut tool_call_state,
1295 &mut server_tool_uses,
1296 &mut thinking_state,
1297 )
1298 .expect("server_tool_use stop should produce completed raw metadata");
1299
1300 yield super::handle_event(
1301 &StreamingEvent::ContentBlockStart {
1302 index: 1,
1303 content_block: Content::WebSearchToolResult {
1304 tool_use_id: "srvtoolu_01".to_string(),
1305 content: serde_json::json!([{
1306 "type": "web_search_result",
1307 "url": "https://example.com/shannon",
1308 "title": "Claude Shannon",
1309 "encrypted_content": "encrypted-content"
1310 }]),
1311 },
1312 },
1313 &mut tool_call_state,
1314 &mut server_tool_uses,
1315 &mut thinking_state,
1316 )
1317 .expect("web_search_tool_result block should produce raw metadata");
1318
1319 yield super::handle_event(
1320 &StreamingEvent::ContentBlockStart {
1321 index: 2,
1322 content_block: Content::Text {
1323 text: String::new(),
1324 citations: Vec::new(),
1325 cache_control: None,
1326 },
1327 },
1328 &mut tool_call_state,
1329 &mut server_tool_uses,
1330 &mut thinking_state,
1331 )
1332 .expect("text block start should produce a raw choice");
1333
1334 yield super::handle_event(
1335 &StreamingEvent::ContentBlockDelta {
1336 index: 2,
1337 delta: ContentDelta::TextDelta {
1338 text: "Claude Shannon was born on April 30, 1916.".to_string(),
1339 },
1340 },
1341 &mut tool_call_state,
1342 &mut server_tool_uses,
1343 &mut thinking_state,
1344 )
1345 .expect("text delta should produce a raw choice");
1346
1347 yield super::handle_event(
1348 &StreamingEvent::ContentBlockDelta {
1349 index: 2,
1350 delta: ContentDelta::CitationsDelta {
1351 citation: crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1352 cited_text: "Claude Shannon was born on April 30, 1916.".to_string(),
1353 url: "https://example.com/shannon".to_string(),
1354 title: Some("Claude Shannon".to_string()),
1355 encrypted_index: "encrypted-index".to_string(),
1356 },
1357 },
1358 },
1359 &mut tool_call_state,
1360 &mut server_tool_uses,
1361 &mut thinking_state,
1362 )
1363 .expect("citation delta should produce a raw choice");
1364
1365 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
1366 usage: PartialUsage::default(),
1367 }));
1368 };
1369
1370 let mut stream =
1371 crate::streaming::StreamingCompletionResponse::stream(to_stream_result(raw_stream));
1372 while stream.next().await.is_some() {}
1373
1374 let choice_items: Vec<crate::message::AssistantContent> =
1375 stream.choice.clone().into_iter().collect();
1376 assert_eq!(choice_items.len(), 3);
1377 assert!(
1378 choice_items
1379 .iter()
1380 .all(|item| !matches!(item, crate::message::AssistantContent::ToolCall(_))),
1381 "provider-owned web-search blocks must not become Rig client tool calls"
1382 );
1383
1384 let Some(crate::message::AssistantContent::Text(server_tool_use)) = choice_items.first()
1385 else {
1386 panic!("expected raw server_tool_use metadata");
1387 };
1388 assert_eq!(
1389 server_tool_use.additional_params.as_ref().unwrap()
1390 [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["type"],
1391 "server_tool_use"
1392 );
1393 assert_eq!(
1394 server_tool_use.additional_params.as_ref().unwrap()
1395 [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["input"]["query"],
1396 "claude shannon birth date"
1397 );
1398
1399 let Some(crate::message::AssistantContent::Text(web_search_result)) = choice_items.get(1)
1400 else {
1401 panic!("expected raw web_search_tool_result metadata");
1402 };
1403 assert_eq!(
1404 web_search_result.additional_params.as_ref().unwrap()
1405 [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["content"][0]
1406 ["encrypted_content"],
1407 "encrypted-content"
1408 );
1409
1410 let Some(crate::message::AssistantContent::Text(answer)) = choice_items.get(2) else {
1411 panic!("expected answer text");
1412 };
1413 assert_eq!(answer.text, "Claude Shannon was born on April 30, 1916.");
1414 let citations = crate::providers::anthropic::completion::anthropic_citations(answer)
1415 .expect("expected preserved citations");
1416 assert!(matches!(
1417 citations.first(),
1418 Some(crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1419 encrypted_index,
1420 ..
1421 }) if encrypted_index == "encrypted-index"
1422 ));
1423 }
1424
1425 #[test]
1426 fn test_handle_citations_delta_event_preserves_metadata() {
1427 let event = StreamingEvent::ContentBlockDelta {
1428 index: 0,
1429 delta: ContentDelta::CitationsDelta {
1430 citation: crate::providers::anthropic::completion::Citation::CharLocation {
1431 cited_text: "The grass is green.".to_string(),
1432 document_index: 0,
1433 document_title: Some("Example".to_string()),
1434 start_char_index: 0,
1435 end_char_index: 20,
1436 },
1437 },
1438 };
1439
1440 let mut tool_call_state = None;
1441 let mut thinking_state = None;
1442 let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
1443
1444 assert!(result.is_some());
1445 let choice = result.unwrap().unwrap();
1446 let RawStreamingChoice::TextAdditionalParams(additional_params) = choice else {
1447 panic!("expected TextAdditionalParams choice");
1448 };
1449 assert_eq!(additional_params["citations"][0]["type"], "char_location");
1450 }
1451
1452 #[tokio::test]
1453 async fn test_streaming_citation_deltas_are_preserved_on_final_text() {
1454 let citation = crate::providers::anthropic::completion::Citation::CharLocation {
1455 cited_text: "The grass is green.".to_string(),
1456 document_index: 0,
1457 document_title: Some("Example".to_string()),
1458 start_char_index: 0,
1459 end_char_index: 20,
1460 };
1461
1462 let raw_stream = stream! {
1463 let mut tool_call_state = None;
1464 let mut thinking_state = None;
1465
1466 yield handle_event(
1467 &StreamingEvent::ContentBlockStart {
1468 index: 0,
1469 content_block: Content::Text {
1470 text: String::new(),
1471 citations: Vec::new(),
1472 cache_control: None,
1473 },
1474 },
1475 &mut tool_call_state,
1476 &mut thinking_state,
1477 )
1478 .expect("text block start should produce a raw choice");
1479
1480 yield handle_event(
1481 &StreamingEvent::ContentBlockDelta {
1482 index: 0,
1483 delta: ContentDelta::TextDelta {
1484 text: "the grass is green".to_string(),
1485 },
1486 },
1487 &mut tool_call_state,
1488 &mut thinking_state,
1489 )
1490 .expect("text delta should produce a raw choice");
1491
1492 yield handle_event(
1493 &StreamingEvent::ContentBlockDelta {
1494 index: 0,
1495 delta: ContentDelta::CitationsDelta {
1496 citation: crate::providers::anthropic::completion::Citation::CharLocation {
1497 cited_text: "The grass is green.".to_string(),
1498 document_index: 0,
1499 document_title: Some("Example".to_string()),
1500 start_char_index: 0,
1501 end_char_index: 20,
1502 },
1503 },
1504 },
1505 &mut tool_call_state,
1506 &mut thinking_state,
1507 )
1508 .expect("citation delta should produce a raw choice");
1509
1510 yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
1511 usage: PartialUsage::default(),
1512 }));
1513 };
1514
1515 let mut stream =
1516 crate::streaming::StreamingCompletionResponse::stream(to_stream_result(raw_stream));
1517 while stream.next().await.is_some() {}
1518
1519 let choice_items: Vec<crate::message::AssistantContent> =
1520 stream.choice.clone().into_iter().collect();
1521 let Some(crate::message::AssistantContent::Text(text)) = choice_items.first() else {
1522 panic!("expected accumulated text item");
1523 };
1524
1525 assert_eq!(text.text, "the grass is green");
1526 let citations = crate::providers::anthropic::completion::anthropic_citations(text).unwrap();
1527 assert_eq!(citations, vec![citation]);
1528 }
1529
1530 #[test]
1531 fn test_unknown_content_delta_falls_back() {
1532 let json = r#"{"type": "something_new_from_anthropic", "field": "x"}"#;
1533 let delta: ContentDelta = serde_json::from_str(json).unwrap();
1534 assert!(matches!(delta, ContentDelta::Unknown));
1535 }
1536}