Skip to main content

rig_core/providers/anthropic/
streaming.rs

1use async_stream::stream;
2use futures::StreamExt;
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{Level, enabled, info_span};
6use tracing_futures::Instrument;
7
8use super::completion::{
9    AnthropicCompatibleProvider, Content, GenericCompletionModel, Message, SystemContent,
10    ToolChoice, Usage, apply_prompt_cache_control, build_tool_definitions,
11    resolve_top_level_cache_control, split_system_messages_from_history,
12    supports_mid_conversation_system_messages,
13};
14use crate::completion::{CompletionError, CompletionRequest, GetTokenUsage};
15use crate::http_client::sse::{Event, GenericEventSource};
16use crate::http_client::{self, HttpClientExt};
17use crate::json_utils::merge_inplace;
18use crate::message::ReasoningContent;
19use crate::streaming::{
20    self, RawStreamingChoice, RawStreamingToolCall, StreamingResult, ToolCallDeltaContent,
21};
22use crate::telemetry::SpanCombinator;
23use crate::wasm_compat::{WasmCompatSend, WasmCompatSync};
24use std::collections::HashMap;
25
26#[derive(Debug, Deserialize)]
27#[serde(tag = "type", rename_all = "snake_case")]
28pub enum StreamingEvent {
29    MessageStart {
30        message: MessageStart,
31    },
32    ContentBlockStart {
33        index: usize,
34        content_block: Content,
35    },
36    ContentBlockDelta {
37        index: usize,
38        delta: ContentDelta,
39    },
40    ContentBlockStop {
41        index: usize,
42    },
43    MessageDelta {
44        delta: MessageDelta,
45        usage: PartialUsage,
46    },
47    MessageStop,
48    Ping,
49    #[serde(other)]
50    Unknown,
51}
52
53#[derive(Debug, Deserialize)]
54pub struct MessageStart {
55    pub id: String,
56    pub role: String,
57    pub content: Vec<Content>,
58    pub model: String,
59    pub stop_reason: Option<String>,
60    pub stop_sequence: Option<String>,
61    pub usage: Usage,
62}
63
64#[derive(Debug, Deserialize)]
65#[serde(tag = "type", rename_all = "snake_case")]
66pub enum ContentDelta {
67    TextDelta {
68        text: String,
69    },
70    InputJsonDelta {
71        partial_json: String,
72    },
73    ThinkingDelta {
74        thinking: String,
75    },
76    SignatureDelta {
77        signature: String,
78    },
79    CitationsDelta {
80        citation: super::completion::Citation,
81    },
82    /// Forward-compatibility fallback. Any delta type Anthropic adds in the
83    /// future that this crate does not yet model deserializes here so the
84    /// surrounding [`StreamingEvent`] still parses.
85    #[serde(other)]
86    Unknown,
87}
88
89#[derive(Debug, Deserialize)]
90pub struct MessageDelta {
91    pub stop_reason: Option<String>,
92    pub stop_sequence: Option<String>,
93}
94
95#[derive(Debug, Deserialize, Clone, Serialize, Default)]
96pub struct PartialUsage {
97    pub output_tokens: usize,
98    #[serde(default)]
99    pub input_tokens: Option<usize>,
100    #[serde(default)]
101    pub cache_creation_input_tokens: Option<u64>,
102    #[serde(default)]
103    pub cache_read_input_tokens: Option<u64>,
104}
105
106impl GetTokenUsage for PartialUsage {
107    fn token_usage(&self) -> Option<crate::completion::Usage> {
108        let mut usage = crate::completion::Usage::new();
109
110        usage.input_tokens = self.input_tokens.unwrap_or_default() as u64;
111        usage.output_tokens = self.output_tokens as u64;
112        usage.cached_input_tokens = self.cache_read_input_tokens.unwrap_or(0);
113        usage.cache_creation_input_tokens = self.cache_creation_input_tokens.unwrap_or(0);
114        usage.total_tokens = usage.input_tokens
115            + usage.cached_input_tokens
116            + usage.cache_creation_input_tokens
117            + usage.output_tokens;
118        Some(usage)
119    }
120}
121
122#[derive(Default)]
123struct ToolCallState {
124    name: String,
125    id: String,
126    internal_call_id: String,
127    input_json: String,
128}
129
130struct ServerToolUseState {
131    name: String,
132    id: String,
133    initial_input: Value,
134    input_json: String,
135}
136
137#[derive(Default)]
138struct ThinkingState {
139    thinking: String,
140    signature: String,
141}
142
143#[derive(Clone, Debug, Deserialize, Serialize)]
144pub struct StreamingCompletionResponse {
145    pub usage: PartialUsage,
146}
147
148impl GetTokenUsage for StreamingCompletionResponse {
149    fn token_usage(&self) -> Option<crate::completion::Usage> {
150        let mut usage = crate::completion::Usage::new();
151        usage.input_tokens = self.usage.input_tokens.unwrap_or(0) as u64;
152        usage.output_tokens = self.usage.output_tokens as u64;
153        usage.cached_input_tokens = self.usage.cache_read_input_tokens.unwrap_or(0);
154        usage.cache_creation_input_tokens = self.usage.cache_creation_input_tokens.unwrap_or(0);
155        usage.total_tokens = usage.input_tokens
156            + usage.cached_input_tokens
157            + usage.cache_creation_input_tokens
158            + usage.output_tokens;
159
160        Some(usage)
161    }
162}
163
164impl<Ext, T> GenericCompletionModel<Ext, T>
165where
166    T: HttpClientExt + Clone + Default + 'static,
167    Ext: AnthropicCompatibleProvider + Clone + WasmCompatSend + WasmCompatSync + 'static,
168{
169    pub(crate) async fn stream(
170        &self,
171        mut completion_request: CompletionRequest,
172    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
173    {
174        let request_model = completion_request
175            .model
176            .clone()
177            .unwrap_or_else(|| self.model.clone());
178        let span = if tracing::Span::current().is_disabled() {
179            info_span!(
180                target: "rig::completions",
181                "chat_streaming",
182                gen_ai.operation.name = "chat_streaming",
183                gen_ai.provider.name = Ext::PROVIDER_NAME,
184                gen_ai.request.model = &request_model,
185                gen_ai.system_instructions = &completion_request.preamble,
186                gen_ai.response.id = tracing::field::Empty,
187                gen_ai.response.model = &request_model,
188                gen_ai.usage.output_tokens = tracing::field::Empty,
189                gen_ai.usage.input_tokens = tracing::field::Empty,
190                gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
191                gen_ai.usage.cache_creation.input_tokens = tracing::field::Empty,
192                gen_ai.input.messages = tracing::field::Empty,
193                gen_ai.output.messages = tracing::field::Empty,
194            )
195        } else {
196            tracing::Span::current()
197        };
198        let max_tokens = if let Some(tokens) = completion_request.max_tokens {
199            tokens
200        } else if let Some(tokens) = self.default_max_tokens {
201            tokens
202        } else {
203            return Err(CompletionError::RequestError(
204                "`max_tokens` must be set for Anthropic".into(),
205            ));
206        };
207
208        let docs = completion_request.normalized_documents();
209        let (history_system, chat_history) = split_system_messages_from_history(
210            completion_request.chat_history.into_iter().collect(),
211            supports_mid_conversation_system_messages(&request_model),
212        );
213        let mut full_history = vec![];
214        if let Some(docs) = docs {
215            full_history.push(docs);
216        }
217        full_history.extend(chat_history);
218
219        let mut messages = full_history
220            .into_iter()
221            .map(Message::try_from)
222            .collect::<Result<Vec<Message>, _>>()?;
223
224        // Convert system prompt to array format for cache_control support
225        let mut system: Vec<SystemContent> =
226            if let Some(preamble) = completion_request.preamble.as_ref() {
227                if preamble.is_empty() {
228                    vec![]
229                } else {
230                    vec![SystemContent::Text {
231                        text: preamble.clone(),
232                        cache_control: None,
233                    }]
234                }
235            } else {
236                vec![]
237            };
238        system.extend(history_system);
239
240        let mut additional_params_payload = completion_request
241            .additional_params
242            .take()
243            .unwrap_or(Value::Null);
244        let top_level_cache_control = resolve_top_level_cache_control(
245            self.automatic_caching,
246            self.automatic_caching_ttl.clone(),
247            &mut additional_params_payload,
248        )?;
249        let mut tools =
250            build_tool_definitions(completion_request.tools, &mut additional_params_payload)?;
251
252        apply_prompt_cache_control(
253            &mut system,
254            &mut messages,
255            &mut tools,
256            self.prompt_caching,
257            top_level_cache_control.as_ref(),
258        )?;
259
260        let mut body = json!({
261            "model": request_model,
262            "messages": messages,
263            "max_tokens": max_tokens,
264            "stream": true,
265        });
266
267        // Automatic caching: one top-level field; the API moves the breakpoint automatically.
268        // No beta header is required.
269        if let Some(cache_control) = top_level_cache_control {
270            merge_inplace(
271                &mut body,
272                json!({ "cache_control": serde_json::to_value(&cache_control)? }),
273            );
274        }
275
276        // Add system prompt if non-empty
277        if !system.is_empty() {
278            merge_inplace(&mut body, json!({ "system": system }));
279        }
280
281        if let Some(temperature) = completion_request.temperature {
282            merge_inplace(&mut body, json!({ "temperature": temperature }));
283        }
284
285        if !tools.is_empty() {
286            merge_inplace(
287                &mut body,
288                json!({
289                    "tools": tools,
290                    "tool_choice": ToolChoice::Auto,
291                }),
292            );
293        }
294
295        if !additional_params_payload.is_null() {
296            merge_inplace(&mut body, additional_params_payload)
297        }
298
299        if enabled!(Level::TRACE) {
300            tracing::trace!(
301                target: "rig::completions",
302                "Anthropic completion request: {}",
303                serde_json::to_string_pretty(&body)?
304            );
305        }
306
307        let body: Vec<u8> = serde_json::to_vec(&body)?;
308
309        let req = self
310            .client
311            .post("/v1/messages")?
312            .body(body)
313            .map_err(http_client::Error::Protocol)?;
314
315        let stream = GenericEventSource::new(self.client.clone(), req);
316
317        // Use our SSE decoder to directly handle Server-Sent Events format
318        let stream: StreamingResult<StreamingCompletionResponse> = Box::pin(stream! {
319            let mut current_tool_call: Option<ToolCallState> = None;
320            let mut server_tool_uses: HashMap<usize, ServerToolUseState> = HashMap::new();
321            let mut current_thinking: Option<ThinkingState> = None;
322            let mut sse_stream = Box::pin(stream);
323            let mut input_tokens = 0;
324            let mut final_usage = None;
325
326            let mut text_content = String::new();
327
328            while let Some(sse_result) = sse_stream.next().await {
329                match sse_result {
330                    Ok(Event::Open) => {}
331                    Ok(Event::Message(sse)) => {
332                        // Parse the SSE data as a StreamingEvent
333                        match serde_json::from_str::<StreamingEvent>(&sse.data) {
334                            Ok(event) => {
335                                match &event {
336                                    StreamingEvent::MessageStart { message } => {
337                                        input_tokens = message.usage.input_tokens;
338
339                                        let span = tracing::Span::current();
340                                        span.record("gen_ai.response.id", &message.id);
341                                        span.record("gen_ai.response.model", &message.model);
342                                    },
343                                    StreamingEvent::MessageDelta { delta, usage } => {
344                                        if delta.stop_reason.is_some() {
345                                            // cache_creation_input_tokens and cache_read_input_tokens
346                                            // are cumulative totals on message_delta.usage per the
347                                            // Anthropic streaming API spec — use them directly.
348                                            let usage = PartialUsage {
349                                                 output_tokens: usage.output_tokens,
350                                                 input_tokens: usize::try_from(input_tokens).ok(),
351                                                 cache_creation_input_tokens: usage.cache_creation_input_tokens,
352                                                 cache_read_input_tokens: usage.cache_read_input_tokens
353                                            };
354
355                                            let span = tracing::Span::current();
356                                            span.record_token_usage(&usage);
357                                            final_usage = Some(usage);
358                                            break;
359                                        }
360                                    }
361                                    _ => {}
362                                }
363
364                                if let Some(result) = handle_event(
365                                    &event,
366                                    &mut current_tool_call,
367                                    &mut server_tool_uses,
368                                    &mut current_thinking,
369                                ) {
370                                    if let Ok(RawStreamingChoice::Message(ref text)) = result {
371                                        text_content += text;
372                                    }
373                                    yield result;
374                                }
375                            },
376                            Err(e) => {
377                                if !sse.data.trim().is_empty() {
378                                    yield Err(CompletionError::ResponseError(
379                                        format!("Failed to parse JSON: {} (Data: {})", e, sse.data)
380                                    ));
381                                }
382                            }
383                        }
384                    },
385                    Err(e) => {
386                        yield Err(CompletionError::ProviderError(format!("SSE Error: {e}")));
387                        break;
388                    }
389                }
390            }
391
392            // Ensure event source is closed when stream ends
393            sse_stream.close();
394
395            yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
396                usage: final_usage.unwrap_or_default()
397            }))
398        }.instrument(span));
399
400        Ok(streaming::StreamingCompletionResponse::stream(stream))
401    }
402}
403
404fn handle_event(
405    event: &StreamingEvent,
406    current_tool_call: &mut Option<ToolCallState>,
407    server_tool_uses: &mut HashMap<usize, ServerToolUseState>,
408    current_thinking: &mut Option<ThinkingState>,
409) -> Option<Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>> {
410    match event {
411        StreamingEvent::ContentBlockDelta { index, delta } => match delta {
412            ContentDelta::TextDelta { text } => {
413                if current_tool_call.is_none() {
414                    return Some(Ok(RawStreamingChoice::Message(text.clone())));
415                }
416                None
417            }
418            ContentDelta::InputJsonDelta { partial_json } => {
419                if let Some(server_tool_use) = server_tool_uses.get_mut(index) {
420                    server_tool_use.input_json.push_str(partial_json);
421                    return None;
422                }
423
424                if let Some(tool_call) = current_tool_call {
425                    tool_call.input_json.push_str(partial_json);
426                    // Emit the delta so UI can show progress
427                    return Some(Ok(RawStreamingChoice::ToolCallDelta {
428                        id: tool_call.id.clone(),
429                        internal_call_id: tool_call.internal_call_id.clone(),
430                        content: ToolCallDeltaContent::Delta(partial_json.clone()),
431                    }));
432                }
433                None
434            }
435            ContentDelta::ThinkingDelta { thinking } => {
436                current_thinking
437                    .get_or_insert_with(ThinkingState::default)
438                    .thinking
439                    .push_str(thinking);
440
441                Some(Ok(RawStreamingChoice::ReasoningDelta {
442                    id: None,
443                    reasoning: thinking.clone(),
444                }))
445            }
446            ContentDelta::SignatureDelta { signature } => {
447                current_thinking
448                    .get_or_insert_with(ThinkingState::default)
449                    .signature
450                    .push_str(signature);
451
452                // Don't yield signature chunks, they will be included in the final Reasoning
453                None
454            }
455            ContentDelta::CitationsDelta { citation } => {
456                Some(Ok(RawStreamingChoice::TextAdditionalParams(json!({
457                    "citations": [citation]
458                }))))
459            }
460            ContentDelta::Unknown => None,
461        },
462        StreamingEvent::ContentBlockStart {
463            index,
464            content_block,
465        } => match content_block {
466            Content::Text { citations, .. } => {
467                let additional_params = (!citations.is_empty()).then(|| {
468                    json!({
469                        "citations": citations
470                    })
471                });
472                Some(Ok(RawStreamingChoice::TextStart { additional_params }))
473            }
474            Content::ServerToolUse { id, name, input } => {
475                server_tool_uses.insert(
476                    *index,
477                    ServerToolUseState {
478                        name: name.clone(),
479                        id: id.clone(),
480                        initial_input: input.clone(),
481                        input_json: String::new(),
482                    },
483                );
484                None
485            }
486            raw @ Content::WebSearchToolResult { .. } => Some(Ok(RawStreamingChoice::TextStart {
487                additional_params: Some(json!({
488                    super::completion::ANTHROPIC_RAW_CONTENT_KEY: raw
489                })),
490            })),
491            Content::ToolUse { id, name, .. } => {
492                let internal_call_id = nanoid::nanoid!();
493                *current_tool_call = Some(ToolCallState {
494                    name: name.clone(),
495                    id: id.clone(),
496                    internal_call_id: internal_call_id.clone(),
497                    input_json: String::new(),
498                });
499                Some(Ok(RawStreamingChoice::ToolCallDelta {
500                    id: id.clone(),
501                    internal_call_id,
502                    content: ToolCallDeltaContent::Name(name.clone()),
503                }))
504            }
505            Content::Thinking { .. } => {
506                *current_thinking = Some(ThinkingState::default());
507                None
508            }
509            Content::RedactedThinking { data } => Some(Ok(RawStreamingChoice::Reasoning {
510                id: None,
511                content: ReasoningContent::Redacted { data: data.clone() },
512            })),
513            // Handle other content types - they don't need special handling
514            _ => None,
515        },
516        StreamingEvent::ContentBlockStop { index } => {
517            if let Some(thinking_state) = Option::take(current_thinking)
518                && !thinking_state.thinking.is_empty()
519            {
520                let signature = if thinking_state.signature.is_empty() {
521                    None
522                } else {
523                    Some(thinking_state.signature)
524                };
525
526                return Some(Ok(RawStreamingChoice::Reasoning {
527                    id: None,
528                    content: ReasoningContent::Text {
529                        text: thinking_state.thinking,
530                        signature,
531                    },
532                }));
533            }
534
535            if let Some(server_tool_use) = server_tool_uses.remove(index) {
536                let input = if server_tool_use.input_json.is_empty() {
537                    if server_tool_use.initial_input.is_null() {
538                        json!({})
539                    } else {
540                        server_tool_use.initial_input
541                    }
542                } else {
543                    match serde_json::from_str(&server_tool_use.input_json) {
544                        Ok(json_value) => json_value,
545                        Err(e) => return Some(Err(CompletionError::from(e))),
546                    }
547                };
548
549                return Some(Ok(RawStreamingChoice::TextStart {
550                    additional_params: Some(json!({
551                        super::completion::ANTHROPIC_RAW_CONTENT_KEY: Content::ServerToolUse {
552                            id: server_tool_use.id,
553                            name: server_tool_use.name,
554                            input,
555                        }
556                    })),
557                }));
558            }
559
560            if let Some(tool_call) = Option::take(current_tool_call) {
561                let json_str = if tool_call.input_json.is_empty() {
562                    "{}"
563                } else {
564                    &tool_call.input_json
565                };
566                match serde_json::from_str(json_str) {
567                    Ok(json_value) => {
568                        let raw_tool_call =
569                            RawStreamingToolCall::new(tool_call.id, tool_call.name, json_value)
570                                .with_internal_call_id(tool_call.internal_call_id);
571                        Some(Ok(RawStreamingChoice::ToolCall(raw_tool_call)))
572                    }
573                    Err(e) => Some(Err(CompletionError::from(e))),
574                }
575            } else {
576                None
577            }
578        }
579        // Ignore other event types or handle as needed
580        StreamingEvent::MessageStart { .. }
581        | StreamingEvent::MessageDelta { .. }
582        | StreamingEvent::MessageStop
583        | StreamingEvent::Ping
584        | StreamingEvent::Unknown => None,
585    }
586}
587
588#[cfg(test)]
589mod tests {
590    use super::super::completion::{CacheControl, CacheTtl};
591    use super::*;
592    use async_stream::stream;
593    use futures::StreamExt;
594
595    #[cfg(not(all(feature = "wasm", target_arch = "wasm32")))]
596    fn to_stream_result(
597        stream: impl futures::Stream<
598            Item = Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>,
599        > + Send
600        + 'static,
601    ) -> crate::streaming::StreamingResult<StreamingCompletionResponse> {
602        Box::pin(stream)
603    }
604
605    #[cfg(all(feature = "wasm", target_arch = "wasm32"))]
606    fn to_stream_result(
607        stream: impl futures::Stream<
608            Item = Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>,
609        > + 'static,
610    ) -> crate::streaming::StreamingResult<StreamingCompletionResponse> {
611        Box::pin(stream)
612    }
613
614    #[test]
615    fn test_streaming_tool_build_marks_final_combined_tool() {
616        let mut additional_params = json!({
617            "tools": [{
618                "name": "provider_tool",
619                "description": "Provider tool",
620                "input_schema": {"type": "object"}
621            }]
622        });
623
624        let mut tools = build_tool_definitions(
625            vec![crate::completion::ToolDefinition {
626                name: "rig_tool".to_string(),
627                description: "Rig tool".to_string(),
628                parameters: json!({"type": "object", "properties": {}}),
629            }],
630            &mut additional_params,
631        )
632        .unwrap();
633        let mut system: Vec<SystemContent> = Vec::new();
634        let mut messages: Vec<Message> = Vec::new();
635        apply_prompt_cache_control(&mut system, &mut messages, &mut tools, true, None).unwrap();
636
637        assert_eq!(tools.len(), 2);
638        assert!(tools[0].get("cache_control").is_none());
639        assert_eq!(tools[1]["name"], "provider_tool");
640        assert_eq!(tools[1]["cache_control"]["type"], "ephemeral");
641    }
642
643    #[test]
644    fn test_streaming_prompt_cache_control_uses_raw_top_level_ttl() {
645        let mut additional_params = json!({
646            "cache_control": {"type": "ephemeral", "ttl": "1h"}
647        });
648        let top_level_cache_control =
649            resolve_top_level_cache_control(false, None, &mut additional_params).unwrap();
650        let mut tools = build_tool_definitions(
651            vec![crate::completion::ToolDefinition {
652                name: "rig_tool".to_string(),
653                description: "Rig tool".to_string(),
654                parameters: json!({"type": "object", "properties": {}}),
655            }],
656            &mut additional_params,
657        )
658        .unwrap();
659        let mut system = vec![SystemContent::Text {
660            text: "System prompt".to_string(),
661            cache_control: None,
662        }];
663        let mut messages: Vec<Message> = Vec::new();
664
665        apply_prompt_cache_control(
666            &mut system,
667            &mut messages,
668            &mut tools,
669            true,
670            top_level_cache_control.as_ref(),
671        )
672        .unwrap();
673
674        assert_eq!(tools[0]["cache_control"]["type"], "ephemeral");
675        assert_eq!(tools[0]["cache_control"]["ttl"], "1h");
676        match &system[0] {
677            SystemContent::Text {
678                cache_control: Some(CacheControl::Ephemeral { ttl }),
679                ..
680            } => assert_eq!(ttl.as_ref(), Some(&CacheTtl::OneHour)),
681            other => panic!("expected system cache_control, got {other:?}"),
682        }
683        assert!(additional_params.get("cache_control").is_none());
684    }
685
686    fn handle_event(
687        event: &StreamingEvent,
688        current_tool_call: &mut Option<ToolCallState>,
689        current_thinking: &mut Option<ThinkingState>,
690    ) -> Option<Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>> {
691        let mut server_tool_uses = HashMap::new();
692        super::handle_event(
693            event,
694            current_tool_call,
695            &mut server_tool_uses,
696            current_thinking,
697        )
698    }
699
700    #[test]
701    fn test_thinking_delta_deserialization() {
702        let json = r#"{"type": "thinking_delta", "thinking": "Let me think about this..."}"#;
703        let delta: ContentDelta = serde_json::from_str(json).unwrap();
704
705        match delta {
706            ContentDelta::ThinkingDelta { thinking } => {
707                assert_eq!(thinking, "Let me think about this...");
708            }
709            _ => panic!("Expected ThinkingDelta variant"),
710        }
711    }
712
713    #[test]
714    fn test_signature_delta_deserialization() {
715        let json = r#"{"type": "signature_delta", "signature": "abc123def456"}"#;
716        let delta: ContentDelta = serde_json::from_str(json).unwrap();
717
718        match delta {
719            ContentDelta::SignatureDelta { signature } => {
720                assert_eq!(signature, "abc123def456");
721            }
722            _ => panic!("Expected SignatureDelta variant"),
723        }
724    }
725
726    #[test]
727    fn test_thinking_delta_streaming_event_deserialization() {
728        let json = r#"{
729            "type": "content_block_delta",
730            "index": 0,
731            "delta": {
732                "type": "thinking_delta",
733                "thinking": "First, I need to understand the problem."
734            }
735        }"#;
736
737        let event: StreamingEvent = serde_json::from_str(json).unwrap();
738
739        match event {
740            StreamingEvent::ContentBlockDelta { index, delta } => {
741                assert_eq!(index, 0);
742                match delta {
743                    ContentDelta::ThinkingDelta { thinking } => {
744                        assert_eq!(thinking, "First, I need to understand the problem.");
745                    }
746                    _ => panic!("Expected ThinkingDelta"),
747                }
748            }
749            _ => panic!("Expected ContentBlockDelta event"),
750        }
751    }
752
753    #[test]
754    fn test_signature_delta_streaming_event_deserialization() {
755        let json = r#"{
756            "type": "content_block_delta",
757            "index": 0,
758            "delta": {
759                "type": "signature_delta",
760                "signature": "ErUBCkYICBgCIkCaGbqC85F4"
761            }
762        }"#;
763
764        let event: StreamingEvent = serde_json::from_str(json).unwrap();
765
766        match event {
767            StreamingEvent::ContentBlockDelta { index, delta } => {
768                assert_eq!(index, 0);
769                match delta {
770                    ContentDelta::SignatureDelta { signature } => {
771                        assert_eq!(signature, "ErUBCkYICBgCIkCaGbqC85F4");
772                    }
773                    _ => panic!("Expected SignatureDelta"),
774                }
775            }
776            _ => panic!("Expected ContentBlockDelta event"),
777        }
778    }
779
780    #[test]
781    fn test_handle_thinking_delta_event() {
782        let event = StreamingEvent::ContentBlockDelta {
783            index: 0,
784            delta: ContentDelta::ThinkingDelta {
785                thinking: "Analyzing the request...".to_string(),
786            },
787        };
788
789        let mut tool_call_state = None;
790        let mut thinking_state = None;
791        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
792
793        assert!(result.is_some());
794        let choice = result.unwrap().unwrap();
795
796        match choice {
797            RawStreamingChoice::ReasoningDelta { id, reasoning, .. } => {
798                assert_eq!(id, None);
799                assert_eq!(reasoning, "Analyzing the request...");
800            }
801            _ => panic!("Expected ReasoningDelta choice"),
802        }
803
804        // Verify thinking state was updated
805        assert!(thinking_state.is_some());
806        assert_eq!(thinking_state.unwrap().thinking, "Analyzing the request...");
807    }
808
809    #[test]
810    fn test_handle_signature_delta_event() {
811        let event = StreamingEvent::ContentBlockDelta {
812            index: 0,
813            delta: ContentDelta::SignatureDelta {
814                signature: "test_signature".to_string(),
815            },
816        };
817
818        let mut tool_call_state = None;
819        let mut thinking_state = None;
820        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
821
822        // SignatureDelta should not yield anything (returns None)
823        assert!(result.is_none());
824
825        // But signature should be captured in thinking state
826        assert!(thinking_state.is_some());
827        assert_eq!(thinking_state.unwrap().signature, "test_signature");
828    }
829
830    #[test]
831    fn test_handle_redacted_thinking_content_block_start_event() {
832        let event = StreamingEvent::ContentBlockStart {
833            index: 0,
834            content_block: Content::RedactedThinking {
835                data: "redacted_blob".to_string(),
836            },
837        };
838        let mut tool_call_state = None;
839        let mut thinking_state = None;
840        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
841
842        assert!(result.is_some());
843        match result.unwrap().unwrap() {
844            RawStreamingChoice::Reasoning {
845                content: ReasoningContent::Redacted { data },
846                ..
847            } => {
848                assert_eq!(data, "redacted_blob");
849            }
850            _ => panic!("Expected Redacted reasoning chunk"),
851        }
852    }
853
854    #[test]
855    fn test_handle_text_delta_event() {
856        let event = StreamingEvent::ContentBlockDelta {
857            index: 0,
858            delta: ContentDelta::TextDelta {
859                text: "Hello, world!".to_string(),
860            },
861        };
862
863        let mut tool_call_state = None;
864        let mut thinking_state = None;
865        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
866
867        assert!(result.is_some());
868        let choice = result.unwrap().unwrap();
869
870        match choice {
871            RawStreamingChoice::Message(text) => {
872                assert_eq!(text, "Hello, world!");
873            }
874            _ => panic!("Expected Message choice"),
875        }
876    }
877
878    #[test]
879    fn test_handle_text_block_start_event() {
880        let event = StreamingEvent::ContentBlockStart {
881            index: 0,
882            content_block: Content::Text {
883                text: String::new(),
884                citations: Vec::new(),
885                cache_control: None,
886            },
887        };
888
889        let mut tool_call_state = None;
890        let mut thinking_state = None;
891        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
892
893        assert!(result.is_some());
894        let choice = result.unwrap().unwrap();
895        assert!(matches!(
896            choice,
897            RawStreamingChoice::TextStart {
898                additional_params: None
899            }
900        ));
901    }
902
903    #[test]
904    fn test_thinking_delta_does_not_interfere_with_tool_calls() {
905        // Thinking deltas should still be processed even if a tool call is in progress
906        let event = StreamingEvent::ContentBlockDelta {
907            index: 0,
908            delta: ContentDelta::ThinkingDelta {
909                thinking: "Thinking while tool is active...".to_string(),
910            },
911        };
912
913        let mut tool_call_state = Some(ToolCallState {
914            name: "test_tool".to_string(),
915            id: "tool_123".to_string(),
916            internal_call_id: nanoid::nanoid!(),
917            input_json: String::new(),
918        });
919        let mut thinking_state = None;
920
921        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
922
923        assert!(result.is_some());
924        let choice = result.unwrap().unwrap();
925
926        match choice {
927            RawStreamingChoice::ReasoningDelta { reasoning, .. } => {
928                assert_eq!(reasoning, "Thinking while tool is active...");
929            }
930            _ => panic!("Expected ReasoningDelta choice"),
931        }
932
933        // Tool call state should remain unchanged
934        assert!(tool_call_state.is_some());
935    }
936
937    #[test]
938    fn test_handle_input_json_delta_event() {
939        let event = StreamingEvent::ContentBlockDelta {
940            index: 0,
941            delta: ContentDelta::InputJsonDelta {
942                partial_json: "{\"arg\":\"value".to_string(),
943            },
944        };
945
946        let mut tool_call_state = Some(ToolCallState {
947            name: "test_tool".to_string(),
948            id: "tool_123".to_string(),
949            internal_call_id: nanoid::nanoid!(),
950            input_json: String::new(),
951        });
952        let mut thinking_state = None;
953
954        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
955
956        // Should emit a ToolCallDelta
957        assert!(result.is_some());
958        let choice = result.unwrap().unwrap();
959
960        match choice {
961            RawStreamingChoice::ToolCallDelta {
962                id,
963                internal_call_id: _,
964                content,
965            } => {
966                assert_eq!(id, "tool_123");
967                match content {
968                    ToolCallDeltaContent::Delta(delta) => assert_eq!(delta, "{\"arg\":\"value"),
969                    _ => panic!("Expected Delta content"),
970                }
971            }
972            _ => panic!("Expected ToolCallDelta choice, got {:?}", choice),
973        }
974
975        // Verify the input_json was accumulated
976        assert!(tool_call_state.is_some());
977        let state = tool_call_state.unwrap();
978        assert_eq!(state.input_json, "{\"arg\":\"value");
979    }
980
981    #[test]
982    fn test_tool_call_accumulation_with_multiple_deltas() {
983        let mut tool_call_state = Some(ToolCallState {
984            name: "test_tool".to_string(),
985            id: "tool_123".to_string(),
986            internal_call_id: nanoid::nanoid!(),
987            input_json: String::new(),
988        });
989        let mut thinking_state = None;
990
991        // First delta
992        let event1 = StreamingEvent::ContentBlockDelta {
993            index: 0,
994            delta: ContentDelta::InputJsonDelta {
995                partial_json: "{\"location\":".to_string(),
996            },
997        };
998        let result1 = handle_event(&event1, &mut tool_call_state, &mut thinking_state);
999        assert!(result1.is_some());
1000
1001        // Second delta
1002        let event2 = StreamingEvent::ContentBlockDelta {
1003            index: 0,
1004            delta: ContentDelta::InputJsonDelta {
1005                partial_json: "\"Paris\",".to_string(),
1006            },
1007        };
1008        let result2 = handle_event(&event2, &mut tool_call_state, &mut thinking_state);
1009        assert!(result2.is_some());
1010
1011        // Third delta
1012        let event3 = StreamingEvent::ContentBlockDelta {
1013            index: 0,
1014            delta: ContentDelta::InputJsonDelta {
1015                partial_json: "\"temp\":\"20C\"}".to_string(),
1016            },
1017        };
1018        let result3 = handle_event(&event3, &mut tool_call_state, &mut thinking_state);
1019        assert!(result3.is_some());
1020
1021        // Verify accumulated JSON
1022        assert!(tool_call_state.is_some());
1023        let state = tool_call_state.as_ref().unwrap();
1024        assert_eq!(
1025            state.input_json,
1026            "{\"location\":\"Paris\",\"temp\":\"20C\"}"
1027        );
1028
1029        // Final ContentBlockStop should emit complete tool call
1030        let stop_event = StreamingEvent::ContentBlockStop { index: 0 };
1031        let final_result = handle_event(&stop_event, &mut tool_call_state, &mut thinking_state);
1032        assert!(final_result.is_some());
1033
1034        match final_result.unwrap().unwrap() {
1035            RawStreamingChoice::ToolCall(RawStreamingToolCall {
1036                id,
1037                name,
1038                arguments,
1039                ..
1040            }) => {
1041                assert_eq!(id, "tool_123");
1042                assert_eq!(name, "test_tool");
1043                assert_eq!(
1044                    arguments.get("location").unwrap().as_str().unwrap(),
1045                    "Paris"
1046                );
1047                assert_eq!(arguments.get("temp").unwrap().as_str().unwrap(), "20C");
1048            }
1049            other => panic!("Expected ToolCall, got {:?}", other),
1050        }
1051
1052        // Tool call state should be taken
1053        assert!(tool_call_state.is_none());
1054    }
1055
1056    #[test]
1057    fn test_citations_delta_streaming_event_deserialization() {
1058        let json = r#"{
1059            "type": "content_block_delta",
1060            "index": 0,
1061            "delta": {
1062                "type": "citations_delta",
1063                "citation": {
1064                    "type": "char_location",
1065                    "cited_text": "The grass is green.",
1066                    "document_index": 0,
1067                    "document_title": "Example",
1068                    "start_char_index": 0,
1069                    "end_char_index": 20
1070                }
1071            }
1072        }"#;
1073
1074        let event: StreamingEvent = serde_json::from_str(json).unwrap();
1075        let StreamingEvent::ContentBlockDelta { index, delta } = event else {
1076            panic!("expected ContentBlockDelta");
1077        };
1078        assert_eq!(index, 0);
1079        let ContentDelta::CitationsDelta { citation } = delta else {
1080            panic!("expected CitationsDelta");
1081        };
1082        let crate::providers::anthropic::completion::Citation::CharLocation {
1083            start_char_index,
1084            end_char_index,
1085            ..
1086        } = citation
1087        else {
1088            panic!("expected CharLocation");
1089        };
1090        assert_eq!(start_char_index, 0);
1091        assert_eq!(end_char_index, 20);
1092    }
1093
1094    #[test]
1095    fn test_search_result_citations_delta_streaming_event_deserialization() {
1096        let json = r#"{
1097            "type": "content_block_delta",
1098            "index": 0,
1099            "delta": {
1100                "type": "citations_delta",
1101                "citation": {
1102                    "type": "search_result_location",
1103                    "cited_text": "API requests require a key.",
1104                    "source": "https://docs.example.com/api-reference",
1105                    "title": "API Reference",
1106                    "search_result_index": 0,
1107                    "start_block_index": 0,
1108                    "end_block_index": 1
1109                }
1110            }
1111        }"#;
1112
1113        let event: StreamingEvent = serde_json::from_str(json).unwrap();
1114        let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1115            panic!("expected ContentBlockDelta");
1116        };
1117        let ContentDelta::CitationsDelta { citation } = delta else {
1118            panic!("expected CitationsDelta");
1119        };
1120        assert!(matches!(
1121            citation,
1122            crate::providers::anthropic::completion::Citation::SearchResultLocation {
1123                search_result_index: 0,
1124                start_block_index: 0,
1125                end_block_index: 1,
1126                ..
1127            }
1128        ));
1129    }
1130
1131    #[test]
1132    fn test_web_search_result_citations_delta_streaming_event_deserialization() {
1133        let json = r#"{
1134            "type": "content_block_delta",
1135            "index": 0,
1136            "delta": {
1137                "type": "citations_delta",
1138                "citation": {
1139                    "type": "web_search_result_location",
1140                    "cited_text": "Claude Shannon was a mathematician.",
1141                    "url": "https://example.com/shannon",
1142                    "title": "Claude Shannon",
1143                    "encrypted_index": "encrypted-reference"
1144                }
1145            }
1146        }"#;
1147
1148        let event: StreamingEvent = serde_json::from_str(json).unwrap();
1149        let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1150            panic!("expected ContentBlockDelta");
1151        };
1152        let ContentDelta::CitationsDelta { citation } = delta else {
1153            panic!("expected CitationsDelta");
1154        };
1155        assert!(matches!(
1156            citation,
1157            crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1158                ref url,
1159                ref encrypted_index,
1160                ..
1161            } if url == "https://example.com/shannon"
1162                && encrypted_index == "encrypted-reference"
1163        ));
1164    }
1165
1166    #[test]
1167    fn test_web_search_result_citations_delta_allows_null_title() {
1168        let json = r#"{
1169            "type": "content_block_delta",
1170            "index": 0,
1171            "delta": {
1172                "type": "citations_delta",
1173                "citation": {
1174                    "type": "web_search_result_location",
1175                    "cited_text": "Claude Shannon was a mathematician.",
1176                    "url": "https://example.com/shannon",
1177                    "title": null,
1178                    "encrypted_index": "encrypted-reference"
1179                }
1180            }
1181        }"#;
1182
1183        let event: StreamingEvent = serde_json::from_str(json).unwrap();
1184        let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1185            panic!("expected ContentBlockDelta");
1186        };
1187        let ContentDelta::CitationsDelta { citation } = delta else {
1188            panic!("expected CitationsDelta");
1189        };
1190        assert!(matches!(
1191            citation,
1192            crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1193                title: None,
1194                ..
1195            }
1196        ));
1197    }
1198
1199    #[test]
1200    fn test_web_search_content_block_start_events_deserialize() {
1201        let server_tool_use = r#"{
1202            "type": "content_block_start",
1203            "index": 1,
1204            "content_block": {
1205                "type": "server_tool_use",
1206                "id": "srvtoolu_01",
1207                "name": "web_search",
1208                "input": {
1209                    "query": "claude shannon birth date"
1210                }
1211            }
1212        }"#;
1213        let event: StreamingEvent = serde_json::from_str(server_tool_use).unwrap();
1214        assert!(matches!(
1215            event,
1216            StreamingEvent::ContentBlockStart {
1217                content_block: Content::ServerToolUse {
1218                    ref id,
1219                    ref name,
1220                    ref input
1221                },
1222                ..
1223            } if id == "srvtoolu_01"
1224                && name == "web_search"
1225                && input["query"] == "claude shannon birth date"
1226        ));
1227
1228        let web_search_tool_result = r#"{
1229            "type": "content_block_start",
1230            "index": 2,
1231            "content_block": {
1232                "type": "web_search_tool_result",
1233                "tool_use_id": "srvtoolu_01",
1234                "content": [{
1235                    "type": "web_search_result",
1236                    "url": "https://example.com/shannon",
1237                    "title": "Claude Shannon",
1238                    "encrypted_content": "encrypted-content"
1239                }]
1240            }
1241        }"#;
1242        let event: StreamingEvent = serde_json::from_str(web_search_tool_result).unwrap();
1243        assert!(matches!(
1244            event,
1245            StreamingEvent::ContentBlockStart {
1246                content_block: Content::WebSearchToolResult {
1247                    ref tool_use_id,
1248                    ref content
1249                },
1250                ..
1251            } if tool_use_id == "srvtoolu_01"
1252                && content[0]["encrypted_content"] == "encrypted-content"
1253        ));
1254    }
1255
1256    #[tokio::test]
1257    async fn test_streaming_web_search_blocks_are_preserved_on_final_choice() {
1258        let raw_stream = stream! {
1259            let mut tool_call_state = None;
1260            let mut server_tool_uses = HashMap::new();
1261            let mut thinking_state = None;
1262
1263            let server_tool_use_start = super::handle_event(
1264                &StreamingEvent::ContentBlockStart {
1265                    index: 0,
1266                    content_block: Content::ServerToolUse {
1267                        id: "srvtoolu_01".to_string(),
1268                        name: "web_search".to_string(),
1269                        input: serde_json::Value::Null,
1270                    },
1271                },
1272                &mut tool_call_state,
1273                &mut server_tool_uses,
1274                &mut thinking_state,
1275            );
1276            assert!(
1277                server_tool_use_start.is_none(),
1278                "server_tool_use start should be accumulated until its input JSON is complete"
1279            );
1280
1281            let server_tool_use_delta = super::handle_event(
1282                &StreamingEvent::ContentBlockDelta {
1283                    index: 0,
1284                    delta: ContentDelta::InputJsonDelta {
1285                        partial_json: r#"{"query":"claude shannon birth date"}"#.to_string(),
1286                    },
1287                },
1288                &mut tool_call_state,
1289                &mut server_tool_uses,
1290                &mut thinking_state,
1291            );
1292            assert!(
1293                server_tool_use_delta.is_none(),
1294                "server_tool_use input JSON should not be emitted as a Rig tool-call delta"
1295            );
1296
1297            yield super::handle_event(
1298                &StreamingEvent::ContentBlockStop { index: 0 },
1299                &mut tool_call_state,
1300                &mut server_tool_uses,
1301                &mut thinking_state,
1302            )
1303            .expect("server_tool_use stop should produce completed raw metadata");
1304
1305            yield super::handle_event(
1306                &StreamingEvent::ContentBlockStart {
1307                    index: 1,
1308                    content_block: Content::WebSearchToolResult {
1309                        tool_use_id: "srvtoolu_01".to_string(),
1310                        content: serde_json::json!([{
1311                            "type": "web_search_result",
1312                            "url": "https://example.com/shannon",
1313                            "title": "Claude Shannon",
1314                            "encrypted_content": "encrypted-content"
1315                        }]),
1316                    },
1317                },
1318                &mut tool_call_state,
1319                &mut server_tool_uses,
1320                &mut thinking_state,
1321            )
1322            .expect("web_search_tool_result block should produce raw metadata");
1323
1324            yield super::handle_event(
1325                &StreamingEvent::ContentBlockStart {
1326                    index: 2,
1327                    content_block: Content::Text {
1328                        text: String::new(),
1329                        citations: Vec::new(),
1330                        cache_control: None,
1331                    },
1332                },
1333                &mut tool_call_state,
1334                &mut server_tool_uses,
1335                &mut thinking_state,
1336            )
1337            .expect("text block start should produce a raw choice");
1338
1339            yield super::handle_event(
1340                &StreamingEvent::ContentBlockDelta {
1341                    index: 2,
1342                    delta: ContentDelta::TextDelta {
1343                        text: "Claude Shannon was born on April 30, 1916.".to_string(),
1344                    },
1345                },
1346                &mut tool_call_state,
1347                &mut server_tool_uses,
1348                &mut thinking_state,
1349            )
1350            .expect("text delta should produce a raw choice");
1351
1352            yield super::handle_event(
1353                &StreamingEvent::ContentBlockDelta {
1354                    index: 2,
1355                    delta: ContentDelta::CitationsDelta {
1356                        citation: crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1357                            cited_text: "Claude Shannon was born on April 30, 1916.".to_string(),
1358                            url: "https://example.com/shannon".to_string(),
1359                            title: Some("Claude Shannon".to_string()),
1360                            encrypted_index: "encrypted-index".to_string(),
1361                        },
1362                    },
1363                },
1364                &mut tool_call_state,
1365                &mut server_tool_uses,
1366                &mut thinking_state,
1367            )
1368            .expect("citation delta should produce a raw choice");
1369
1370            yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
1371                usage: PartialUsage::default(),
1372            }));
1373        };
1374
1375        let mut stream =
1376            crate::streaming::StreamingCompletionResponse::stream(to_stream_result(raw_stream));
1377        while stream.next().await.is_some() {}
1378
1379        let choice_items: Vec<crate::message::AssistantContent> =
1380            stream.choice.clone().into_iter().collect();
1381        assert_eq!(choice_items.len(), 3);
1382        assert!(
1383            choice_items
1384                .iter()
1385                .all(|item| !matches!(item, crate::message::AssistantContent::ToolCall(_))),
1386            "provider-owned web-search blocks must not become Rig client tool calls"
1387        );
1388
1389        let Some(crate::message::AssistantContent::Text(server_tool_use)) = choice_items.first()
1390        else {
1391            panic!("expected raw server_tool_use metadata");
1392        };
1393        assert_eq!(
1394            server_tool_use.additional_params.as_ref().unwrap()
1395                [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["type"],
1396            "server_tool_use"
1397        );
1398        assert_eq!(
1399            server_tool_use.additional_params.as_ref().unwrap()
1400                [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["input"]["query"],
1401            "claude shannon birth date"
1402        );
1403
1404        let Some(crate::message::AssistantContent::Text(web_search_result)) = choice_items.get(1)
1405        else {
1406            panic!("expected raw web_search_tool_result metadata");
1407        };
1408        assert_eq!(
1409            web_search_result.additional_params.as_ref().unwrap()
1410                [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["content"][0]
1411                ["encrypted_content"],
1412            "encrypted-content"
1413        );
1414
1415        let Some(crate::message::AssistantContent::Text(answer)) = choice_items.get(2) else {
1416            panic!("expected answer text");
1417        };
1418        assert_eq!(answer.text, "Claude Shannon was born on April 30, 1916.");
1419        let citations = crate::providers::anthropic::completion::anthropic_citations(answer)
1420            .expect("expected preserved citations");
1421        assert!(matches!(
1422            citations.first(),
1423            Some(crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1424                encrypted_index,
1425                ..
1426            }) if encrypted_index == "encrypted-index"
1427        ));
1428    }
1429
1430    #[test]
1431    fn test_handle_citations_delta_event_preserves_metadata() {
1432        let event = StreamingEvent::ContentBlockDelta {
1433            index: 0,
1434            delta: ContentDelta::CitationsDelta {
1435                citation: crate::providers::anthropic::completion::Citation::CharLocation {
1436                    cited_text: "The grass is green.".to_string(),
1437                    document_index: 0,
1438                    document_title: Some("Example".to_string()),
1439                    start_char_index: 0,
1440                    end_char_index: 20,
1441                },
1442            },
1443        };
1444
1445        let mut tool_call_state = None;
1446        let mut thinking_state = None;
1447        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
1448
1449        assert!(result.is_some());
1450        let choice = result.unwrap().unwrap();
1451        let RawStreamingChoice::TextAdditionalParams(additional_params) = choice else {
1452            panic!("expected TextAdditionalParams choice");
1453        };
1454        assert_eq!(additional_params["citations"][0]["type"], "char_location");
1455    }
1456
1457    #[tokio::test]
1458    async fn test_streaming_citation_deltas_are_preserved_on_final_text() {
1459        let citation = crate::providers::anthropic::completion::Citation::CharLocation {
1460            cited_text: "The grass is green.".to_string(),
1461            document_index: 0,
1462            document_title: Some("Example".to_string()),
1463            start_char_index: 0,
1464            end_char_index: 20,
1465        };
1466
1467        let raw_stream = stream! {
1468            let mut tool_call_state = None;
1469            let mut thinking_state = None;
1470
1471            yield handle_event(
1472                &StreamingEvent::ContentBlockStart {
1473                    index: 0,
1474                    content_block: Content::Text {
1475                        text: String::new(),
1476                        citations: Vec::new(),
1477                        cache_control: None,
1478                    },
1479                },
1480                &mut tool_call_state,
1481                &mut thinking_state,
1482            )
1483            .expect("text block start should produce a raw choice");
1484
1485            yield handle_event(
1486                &StreamingEvent::ContentBlockDelta {
1487                    index: 0,
1488                    delta: ContentDelta::TextDelta {
1489                        text: "the grass is green".to_string(),
1490                    },
1491                },
1492                &mut tool_call_state,
1493                &mut thinking_state,
1494            )
1495            .expect("text delta should produce a raw choice");
1496
1497            yield handle_event(
1498                &StreamingEvent::ContentBlockDelta {
1499                    index: 0,
1500                    delta: ContentDelta::CitationsDelta {
1501                        citation: crate::providers::anthropic::completion::Citation::CharLocation {
1502                            cited_text: "The grass is green.".to_string(),
1503                            document_index: 0,
1504                            document_title: Some("Example".to_string()),
1505                            start_char_index: 0,
1506                            end_char_index: 20,
1507                        },
1508                    },
1509                },
1510                &mut tool_call_state,
1511                &mut thinking_state,
1512            )
1513            .expect("citation delta should produce a raw choice");
1514
1515            yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
1516                usage: PartialUsage::default(),
1517            }));
1518        };
1519
1520        let mut stream =
1521            crate::streaming::StreamingCompletionResponse::stream(to_stream_result(raw_stream));
1522        while stream.next().await.is_some() {}
1523
1524        let choice_items: Vec<crate::message::AssistantContent> =
1525            stream.choice.clone().into_iter().collect();
1526        let Some(crate::message::AssistantContent::Text(text)) = choice_items.first() else {
1527            panic!("expected accumulated text item");
1528        };
1529
1530        assert_eq!(text.text, "the grass is green");
1531        let citations = crate::providers::anthropic::completion::anthropic_citations(text).unwrap();
1532        assert_eq!(citations, vec![citation]);
1533    }
1534
1535    #[test]
1536    fn test_unknown_content_delta_falls_back() {
1537        let json = r#"{"type": "something_new_from_anthropic", "field": "x"}"#;
1538        let delta: ContentDelta = serde_json::from_str(json).unwrap();
1539        assert!(matches!(delta, ContentDelta::Unknown));
1540    }
1541}