Skip to main content

rig_core/providers/anthropic/
streaming.rs

1use async_stream::stream;
2use futures::StreamExt;
3use serde::{Deserialize, Serialize};
4use serde_json::{Value, json};
5use tracing::{Level, enabled, info_span};
6use tracing_futures::Instrument;
7
8use super::completion::{
9    AnthropicCompatibleProvider, Content, GenericCompletionModel, Message, SystemContent,
10    ToolChoice, Usage, apply_prompt_cache_control, build_tool_definitions,
11    resolve_top_level_cache_control, split_system_messages_from_history,
12};
13use crate::completion::{CompletionError, CompletionRequest, GetTokenUsage};
14use crate::http_client::sse::{Event, GenericEventSource};
15use crate::http_client::{self, HttpClientExt};
16use crate::json_utils::merge_inplace;
17use crate::message::ReasoningContent;
18use crate::streaming::{
19    self, RawStreamingChoice, RawStreamingToolCall, StreamingResult, ToolCallDeltaContent,
20};
21use crate::telemetry::SpanCombinator;
22use crate::wasm_compat::{WasmCompatSend, WasmCompatSync};
23use std::collections::HashMap;
24
25#[derive(Debug, Deserialize)]
26#[serde(tag = "type", rename_all = "snake_case")]
27pub enum StreamingEvent {
28    MessageStart {
29        message: MessageStart,
30    },
31    ContentBlockStart {
32        index: usize,
33        content_block: Content,
34    },
35    ContentBlockDelta {
36        index: usize,
37        delta: ContentDelta,
38    },
39    ContentBlockStop {
40        index: usize,
41    },
42    MessageDelta {
43        delta: MessageDelta,
44        usage: PartialUsage,
45    },
46    MessageStop,
47    Ping,
48    #[serde(other)]
49    Unknown,
50}
51
52#[derive(Debug, Deserialize)]
53pub struct MessageStart {
54    pub id: String,
55    pub role: String,
56    pub content: Vec<Content>,
57    pub model: String,
58    pub stop_reason: Option<String>,
59    pub stop_sequence: Option<String>,
60    pub usage: Usage,
61}
62
63#[derive(Debug, Deserialize)]
64#[serde(tag = "type", rename_all = "snake_case")]
65pub enum ContentDelta {
66    TextDelta {
67        text: String,
68    },
69    InputJsonDelta {
70        partial_json: String,
71    },
72    ThinkingDelta {
73        thinking: String,
74    },
75    SignatureDelta {
76        signature: String,
77    },
78    CitationsDelta {
79        citation: super::completion::Citation,
80    },
81    /// Forward-compatibility fallback. Any delta type Anthropic adds in the
82    /// future that this crate does not yet model deserializes here so the
83    /// surrounding [`StreamingEvent`] still parses.
84    #[serde(other)]
85    Unknown,
86}
87
88#[derive(Debug, Deserialize)]
89pub struct MessageDelta {
90    pub stop_reason: Option<String>,
91    pub stop_sequence: Option<String>,
92}
93
94#[derive(Debug, Deserialize, Clone, Serialize, Default)]
95pub struct PartialUsage {
96    pub output_tokens: usize,
97    #[serde(default)]
98    pub input_tokens: Option<usize>,
99    #[serde(default)]
100    pub cache_creation_input_tokens: Option<u64>,
101    #[serde(default)]
102    pub cache_read_input_tokens: Option<u64>,
103}
104
105impl GetTokenUsage for PartialUsage {
106    fn token_usage(&self) -> Option<crate::completion::Usage> {
107        let mut usage = crate::completion::Usage::new();
108
109        usage.input_tokens = self.input_tokens.unwrap_or_default() as u64;
110        usage.output_tokens = self.output_tokens as u64;
111        usage.cached_input_tokens = self.cache_read_input_tokens.unwrap_or(0);
112        usage.cache_creation_input_tokens = self.cache_creation_input_tokens.unwrap_or(0);
113        usage.total_tokens = usage.input_tokens
114            + usage.cached_input_tokens
115            + usage.cache_creation_input_tokens
116            + usage.output_tokens;
117        Some(usage)
118    }
119}
120
121#[derive(Default)]
122struct ToolCallState {
123    name: String,
124    id: String,
125    internal_call_id: String,
126    input_json: String,
127}
128
129struct ServerToolUseState {
130    name: String,
131    id: String,
132    initial_input: Value,
133    input_json: String,
134}
135
136#[derive(Default)]
137struct ThinkingState {
138    thinking: String,
139    signature: String,
140}
141
142#[derive(Clone, Debug, Deserialize, Serialize)]
143pub struct StreamingCompletionResponse {
144    pub usage: PartialUsage,
145}
146
147impl GetTokenUsage for StreamingCompletionResponse {
148    fn token_usage(&self) -> Option<crate::completion::Usage> {
149        let mut usage = crate::completion::Usage::new();
150        usage.input_tokens = self.usage.input_tokens.unwrap_or(0) as u64;
151        usage.output_tokens = self.usage.output_tokens as u64;
152        usage.cached_input_tokens = self.usage.cache_read_input_tokens.unwrap_or(0);
153        usage.cache_creation_input_tokens = self.usage.cache_creation_input_tokens.unwrap_or(0);
154        usage.total_tokens = usage.input_tokens
155            + usage.cached_input_tokens
156            + usage.cache_creation_input_tokens
157            + usage.output_tokens;
158
159        Some(usage)
160    }
161}
162
163impl<Ext, T> GenericCompletionModel<Ext, T>
164where
165    T: HttpClientExt + Clone + Default + 'static,
166    Ext: AnthropicCompatibleProvider + Clone + WasmCompatSend + WasmCompatSync + 'static,
167{
168    pub(crate) async fn stream(
169        &self,
170        mut completion_request: CompletionRequest,
171    ) -> Result<streaming::StreamingCompletionResponse<StreamingCompletionResponse>, CompletionError>
172    {
173        let request_model = completion_request
174            .model
175            .clone()
176            .unwrap_or_else(|| self.model.clone());
177        let span = if tracing::Span::current().is_disabled() {
178            info_span!(
179                target: "rig::completions",
180                "chat_streaming",
181                gen_ai.operation.name = "chat_streaming",
182                gen_ai.provider.name = Ext::PROVIDER_NAME,
183                gen_ai.request.model = &request_model,
184                gen_ai.system_instructions = &completion_request.preamble,
185                gen_ai.response.id = tracing::field::Empty,
186                gen_ai.response.model = &request_model,
187                gen_ai.usage.output_tokens = tracing::field::Empty,
188                gen_ai.usage.input_tokens = tracing::field::Empty,
189                gen_ai.usage.cache_read.input_tokens = tracing::field::Empty,
190                gen_ai.usage.cache_creation.input_tokens = tracing::field::Empty,
191                gen_ai.input.messages = tracing::field::Empty,
192                gen_ai.output.messages = tracing::field::Empty,
193            )
194        } else {
195            tracing::Span::current()
196        };
197        let max_tokens = if let Some(tokens) = completion_request.max_tokens {
198            tokens
199        } else if let Some(tokens) = self.default_max_tokens {
200            tokens
201        } else {
202            return Err(CompletionError::RequestError(
203                "`max_tokens` must be set for Anthropic".into(),
204            ));
205        };
206
207        let mut full_history = vec![];
208        if let Some(docs) = completion_request.normalized_documents() {
209            full_history.push(docs);
210        }
211        full_history.extend(completion_request.chat_history);
212        let (history_system, full_history) = split_system_messages_from_history(full_history);
213
214        let mut messages = full_history
215            .into_iter()
216            .map(Message::try_from)
217            .collect::<Result<Vec<Message>, _>>()?;
218
219        // Convert system prompt to array format for cache_control support
220        let mut system: Vec<SystemContent> =
221            if let Some(preamble) = completion_request.preamble.as_ref() {
222                if preamble.is_empty() {
223                    vec![]
224                } else {
225                    vec![SystemContent::Text {
226                        text: preamble.clone(),
227                        cache_control: None,
228                    }]
229                }
230            } else {
231                vec![]
232            };
233        system.extend(history_system);
234
235        let mut additional_params_payload = completion_request
236            .additional_params
237            .take()
238            .unwrap_or(Value::Null);
239        let top_level_cache_control = resolve_top_level_cache_control(
240            self.automatic_caching,
241            self.automatic_caching_ttl.clone(),
242            &mut additional_params_payload,
243        )?;
244        let mut tools =
245            build_tool_definitions(completion_request.tools, &mut additional_params_payload)?;
246
247        apply_prompt_cache_control(
248            &mut system,
249            &mut messages,
250            &mut tools,
251            self.prompt_caching,
252            top_level_cache_control.as_ref(),
253        )?;
254
255        let mut body = json!({
256            "model": request_model,
257            "messages": messages,
258            "max_tokens": max_tokens,
259            "stream": true,
260        });
261
262        // Automatic caching: one top-level field; the API moves the breakpoint automatically.
263        // No beta header is required.
264        if let Some(cache_control) = top_level_cache_control {
265            merge_inplace(
266                &mut body,
267                json!({ "cache_control": serde_json::to_value(&cache_control)? }),
268            );
269        }
270
271        // Add system prompt if non-empty
272        if !system.is_empty() {
273            merge_inplace(&mut body, json!({ "system": system }));
274        }
275
276        if let Some(temperature) = completion_request.temperature {
277            merge_inplace(&mut body, json!({ "temperature": temperature }));
278        }
279
280        if !tools.is_empty() {
281            merge_inplace(
282                &mut body,
283                json!({
284                    "tools": tools,
285                    "tool_choice": ToolChoice::Auto,
286                }),
287            );
288        }
289
290        if !additional_params_payload.is_null() {
291            merge_inplace(&mut body, additional_params_payload)
292        }
293
294        if enabled!(Level::TRACE) {
295            tracing::trace!(
296                target: "rig::completions",
297                "Anthropic completion request: {}",
298                serde_json::to_string_pretty(&body)?
299            );
300        }
301
302        let body: Vec<u8> = serde_json::to_vec(&body)?;
303
304        let req = self
305            .client
306            .post("/v1/messages")?
307            .body(body)
308            .map_err(http_client::Error::Protocol)?;
309
310        let stream = GenericEventSource::new(self.client.clone(), req);
311
312        // Use our SSE decoder to directly handle Server-Sent Events format
313        let stream: StreamingResult<StreamingCompletionResponse> = Box::pin(stream! {
314            let mut current_tool_call: Option<ToolCallState> = None;
315            let mut server_tool_uses: HashMap<usize, ServerToolUseState> = HashMap::new();
316            let mut current_thinking: Option<ThinkingState> = None;
317            let mut sse_stream = Box::pin(stream);
318            let mut input_tokens = 0;
319            let mut final_usage = None;
320
321            let mut text_content = String::new();
322
323            while let Some(sse_result) = sse_stream.next().await {
324                match sse_result {
325                    Ok(Event::Open) => {}
326                    Ok(Event::Message(sse)) => {
327                        // Parse the SSE data as a StreamingEvent
328                        match serde_json::from_str::<StreamingEvent>(&sse.data) {
329                            Ok(event) => {
330                                match &event {
331                                    StreamingEvent::MessageStart { message } => {
332                                        input_tokens = message.usage.input_tokens;
333
334                                        let span = tracing::Span::current();
335                                        span.record("gen_ai.response.id", &message.id);
336                                        span.record("gen_ai.response.model", &message.model);
337                                    },
338                                    StreamingEvent::MessageDelta { delta, usage } => {
339                                        if delta.stop_reason.is_some() {
340                                            // cache_creation_input_tokens and cache_read_input_tokens
341                                            // are cumulative totals on message_delta.usage per the
342                                            // Anthropic streaming API spec — use them directly.
343                                            let usage = PartialUsage {
344                                                 output_tokens: usage.output_tokens,
345                                                 input_tokens: usize::try_from(input_tokens).ok(),
346                                                 cache_creation_input_tokens: usage.cache_creation_input_tokens,
347                                                 cache_read_input_tokens: usage.cache_read_input_tokens
348                                            };
349
350                                            let span = tracing::Span::current();
351                                            span.record_token_usage(&usage);
352                                            final_usage = Some(usage);
353                                            break;
354                                        }
355                                    }
356                                    _ => {}
357                                }
358
359                                if let Some(result) = handle_event(
360                                    &event,
361                                    &mut current_tool_call,
362                                    &mut server_tool_uses,
363                                    &mut current_thinking,
364                                ) {
365                                    if let Ok(RawStreamingChoice::Message(ref text)) = result {
366                                        text_content += text;
367                                    }
368                                    yield result;
369                                }
370                            },
371                            Err(e) => {
372                                if !sse.data.trim().is_empty() {
373                                    yield Err(CompletionError::ResponseError(
374                                        format!("Failed to parse JSON: {} (Data: {})", e, sse.data)
375                                    ));
376                                }
377                            }
378                        }
379                    },
380                    Err(e) => {
381                        yield Err(CompletionError::ProviderError(format!("SSE Error: {e}")));
382                        break;
383                    }
384                }
385            }
386
387            // Ensure event source is closed when stream ends
388            sse_stream.close();
389
390            yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
391                usage: final_usage.unwrap_or_default()
392            }))
393        }.instrument(span));
394
395        Ok(streaming::StreamingCompletionResponse::stream(stream))
396    }
397}
398
399fn handle_event(
400    event: &StreamingEvent,
401    current_tool_call: &mut Option<ToolCallState>,
402    server_tool_uses: &mut HashMap<usize, ServerToolUseState>,
403    current_thinking: &mut Option<ThinkingState>,
404) -> Option<Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>> {
405    match event {
406        StreamingEvent::ContentBlockDelta { index, delta } => match delta {
407            ContentDelta::TextDelta { text } => {
408                if current_tool_call.is_none() {
409                    return Some(Ok(RawStreamingChoice::Message(text.clone())));
410                }
411                None
412            }
413            ContentDelta::InputJsonDelta { partial_json } => {
414                if let Some(server_tool_use) = server_tool_uses.get_mut(index) {
415                    server_tool_use.input_json.push_str(partial_json);
416                    return None;
417                }
418
419                if let Some(tool_call) = current_tool_call {
420                    tool_call.input_json.push_str(partial_json);
421                    // Emit the delta so UI can show progress
422                    return Some(Ok(RawStreamingChoice::ToolCallDelta {
423                        id: tool_call.id.clone(),
424                        internal_call_id: tool_call.internal_call_id.clone(),
425                        content: ToolCallDeltaContent::Delta(partial_json.clone()),
426                    }));
427                }
428                None
429            }
430            ContentDelta::ThinkingDelta { thinking } => {
431                current_thinking
432                    .get_or_insert_with(ThinkingState::default)
433                    .thinking
434                    .push_str(thinking);
435
436                Some(Ok(RawStreamingChoice::ReasoningDelta {
437                    id: None,
438                    reasoning: thinking.clone(),
439                }))
440            }
441            ContentDelta::SignatureDelta { signature } => {
442                current_thinking
443                    .get_or_insert_with(ThinkingState::default)
444                    .signature
445                    .push_str(signature);
446
447                // Don't yield signature chunks, they will be included in the final Reasoning
448                None
449            }
450            ContentDelta::CitationsDelta { citation } => {
451                Some(Ok(RawStreamingChoice::TextAdditionalParams(json!({
452                    "citations": [citation]
453                }))))
454            }
455            ContentDelta::Unknown => None,
456        },
457        StreamingEvent::ContentBlockStart {
458            index,
459            content_block,
460        } => match content_block {
461            Content::Text { citations, .. } => {
462                let additional_params = (!citations.is_empty()).then(|| {
463                    json!({
464                        "citations": citations
465                    })
466                });
467                Some(Ok(RawStreamingChoice::TextStart { additional_params }))
468            }
469            Content::ServerToolUse { id, name, input } => {
470                server_tool_uses.insert(
471                    *index,
472                    ServerToolUseState {
473                        name: name.clone(),
474                        id: id.clone(),
475                        initial_input: input.clone(),
476                        input_json: String::new(),
477                    },
478                );
479                None
480            }
481            raw @ Content::WebSearchToolResult { .. } => Some(Ok(RawStreamingChoice::TextStart {
482                additional_params: Some(json!({
483                    super::completion::ANTHROPIC_RAW_CONTENT_KEY: raw
484                })),
485            })),
486            Content::ToolUse { id, name, .. } => {
487                let internal_call_id = nanoid::nanoid!();
488                *current_tool_call = Some(ToolCallState {
489                    name: name.clone(),
490                    id: id.clone(),
491                    internal_call_id: internal_call_id.clone(),
492                    input_json: String::new(),
493                });
494                Some(Ok(RawStreamingChoice::ToolCallDelta {
495                    id: id.clone(),
496                    internal_call_id,
497                    content: ToolCallDeltaContent::Name(name.clone()),
498                }))
499            }
500            Content::Thinking { .. } => {
501                *current_thinking = Some(ThinkingState::default());
502                None
503            }
504            Content::RedactedThinking { data } => Some(Ok(RawStreamingChoice::Reasoning {
505                id: None,
506                content: ReasoningContent::Redacted { data: data.clone() },
507            })),
508            // Handle other content types - they don't need special handling
509            _ => None,
510        },
511        StreamingEvent::ContentBlockStop { index } => {
512            if let Some(thinking_state) = Option::take(current_thinking)
513                && !thinking_state.thinking.is_empty()
514            {
515                let signature = if thinking_state.signature.is_empty() {
516                    None
517                } else {
518                    Some(thinking_state.signature)
519                };
520
521                return Some(Ok(RawStreamingChoice::Reasoning {
522                    id: None,
523                    content: ReasoningContent::Text {
524                        text: thinking_state.thinking,
525                        signature,
526                    },
527                }));
528            }
529
530            if let Some(server_tool_use) = server_tool_uses.remove(index) {
531                let input = if server_tool_use.input_json.is_empty() {
532                    if server_tool_use.initial_input.is_null() {
533                        json!({})
534                    } else {
535                        server_tool_use.initial_input
536                    }
537                } else {
538                    match serde_json::from_str(&server_tool_use.input_json) {
539                        Ok(json_value) => json_value,
540                        Err(e) => return Some(Err(CompletionError::from(e))),
541                    }
542                };
543
544                return Some(Ok(RawStreamingChoice::TextStart {
545                    additional_params: Some(json!({
546                        super::completion::ANTHROPIC_RAW_CONTENT_KEY: Content::ServerToolUse {
547                            id: server_tool_use.id,
548                            name: server_tool_use.name,
549                            input,
550                        }
551                    })),
552                }));
553            }
554
555            if let Some(tool_call) = Option::take(current_tool_call) {
556                let json_str = if tool_call.input_json.is_empty() {
557                    "{}"
558                } else {
559                    &tool_call.input_json
560                };
561                match serde_json::from_str(json_str) {
562                    Ok(json_value) => {
563                        let raw_tool_call =
564                            RawStreamingToolCall::new(tool_call.id, tool_call.name, json_value)
565                                .with_internal_call_id(tool_call.internal_call_id);
566                        Some(Ok(RawStreamingChoice::ToolCall(raw_tool_call)))
567                    }
568                    Err(e) => Some(Err(CompletionError::from(e))),
569                }
570            } else {
571                None
572            }
573        }
574        // Ignore other event types or handle as needed
575        StreamingEvent::MessageStart { .. }
576        | StreamingEvent::MessageDelta { .. }
577        | StreamingEvent::MessageStop
578        | StreamingEvent::Ping
579        | StreamingEvent::Unknown => None,
580    }
581}
582
583#[cfg(test)]
584mod tests {
585    use super::super::completion::{CacheControl, CacheTtl};
586    use super::*;
587    use async_stream::stream;
588    use futures::StreamExt;
589
590    #[cfg(not(all(feature = "wasm", target_arch = "wasm32")))]
591    fn to_stream_result(
592        stream: impl futures::Stream<
593            Item = Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>,
594        > + Send
595        + 'static,
596    ) -> crate::streaming::StreamingResult<StreamingCompletionResponse> {
597        Box::pin(stream)
598    }
599
600    #[cfg(all(feature = "wasm", target_arch = "wasm32"))]
601    fn to_stream_result(
602        stream: impl futures::Stream<
603            Item = Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>,
604        > + 'static,
605    ) -> crate::streaming::StreamingResult<StreamingCompletionResponse> {
606        Box::pin(stream)
607    }
608
609    #[test]
610    fn test_streaming_tool_build_marks_final_combined_tool() {
611        let mut additional_params = json!({
612            "tools": [{
613                "name": "provider_tool",
614                "description": "Provider tool",
615                "input_schema": {"type": "object"}
616            }]
617        });
618
619        let mut tools = build_tool_definitions(
620            vec![crate::completion::ToolDefinition {
621                name: "rig_tool".to_string(),
622                description: "Rig tool".to_string(),
623                parameters: json!({"type": "object", "properties": {}}),
624            }],
625            &mut additional_params,
626        )
627        .unwrap();
628        let mut system: Vec<SystemContent> = Vec::new();
629        let mut messages: Vec<Message> = Vec::new();
630        apply_prompt_cache_control(&mut system, &mut messages, &mut tools, true, None).unwrap();
631
632        assert_eq!(tools.len(), 2);
633        assert!(tools[0].get("cache_control").is_none());
634        assert_eq!(tools[1]["name"], "provider_tool");
635        assert_eq!(tools[1]["cache_control"]["type"], "ephemeral");
636    }
637
638    #[test]
639    fn test_streaming_prompt_cache_control_uses_raw_top_level_ttl() {
640        let mut additional_params = json!({
641            "cache_control": {"type": "ephemeral", "ttl": "1h"}
642        });
643        let top_level_cache_control =
644            resolve_top_level_cache_control(false, None, &mut additional_params).unwrap();
645        let mut tools = build_tool_definitions(
646            vec![crate::completion::ToolDefinition {
647                name: "rig_tool".to_string(),
648                description: "Rig tool".to_string(),
649                parameters: json!({"type": "object", "properties": {}}),
650            }],
651            &mut additional_params,
652        )
653        .unwrap();
654        let mut system = vec![SystemContent::Text {
655            text: "System prompt".to_string(),
656            cache_control: None,
657        }];
658        let mut messages: Vec<Message> = Vec::new();
659
660        apply_prompt_cache_control(
661            &mut system,
662            &mut messages,
663            &mut tools,
664            true,
665            top_level_cache_control.as_ref(),
666        )
667        .unwrap();
668
669        assert_eq!(tools[0]["cache_control"]["type"], "ephemeral");
670        assert_eq!(tools[0]["cache_control"]["ttl"], "1h");
671        match &system[0] {
672            SystemContent::Text {
673                cache_control: Some(CacheControl::Ephemeral { ttl }),
674                ..
675            } => assert_eq!(ttl.as_ref(), Some(&CacheTtl::OneHour)),
676            other => panic!("expected system cache_control, got {other:?}"),
677        }
678        assert!(additional_params.get("cache_control").is_none());
679    }
680
681    fn handle_event(
682        event: &StreamingEvent,
683        current_tool_call: &mut Option<ToolCallState>,
684        current_thinking: &mut Option<ThinkingState>,
685    ) -> Option<Result<RawStreamingChoice<StreamingCompletionResponse>, CompletionError>> {
686        let mut server_tool_uses = HashMap::new();
687        super::handle_event(
688            event,
689            current_tool_call,
690            &mut server_tool_uses,
691            current_thinking,
692        )
693    }
694
695    #[test]
696    fn test_thinking_delta_deserialization() {
697        let json = r#"{"type": "thinking_delta", "thinking": "Let me think about this..."}"#;
698        let delta: ContentDelta = serde_json::from_str(json).unwrap();
699
700        match delta {
701            ContentDelta::ThinkingDelta { thinking } => {
702                assert_eq!(thinking, "Let me think about this...");
703            }
704            _ => panic!("Expected ThinkingDelta variant"),
705        }
706    }
707
708    #[test]
709    fn test_signature_delta_deserialization() {
710        let json = r#"{"type": "signature_delta", "signature": "abc123def456"}"#;
711        let delta: ContentDelta = serde_json::from_str(json).unwrap();
712
713        match delta {
714            ContentDelta::SignatureDelta { signature } => {
715                assert_eq!(signature, "abc123def456");
716            }
717            _ => panic!("Expected SignatureDelta variant"),
718        }
719    }
720
721    #[test]
722    fn test_thinking_delta_streaming_event_deserialization() {
723        let json = r#"{
724            "type": "content_block_delta",
725            "index": 0,
726            "delta": {
727                "type": "thinking_delta",
728                "thinking": "First, I need to understand the problem."
729            }
730        }"#;
731
732        let event: StreamingEvent = serde_json::from_str(json).unwrap();
733
734        match event {
735            StreamingEvent::ContentBlockDelta { index, delta } => {
736                assert_eq!(index, 0);
737                match delta {
738                    ContentDelta::ThinkingDelta { thinking } => {
739                        assert_eq!(thinking, "First, I need to understand the problem.");
740                    }
741                    _ => panic!("Expected ThinkingDelta"),
742                }
743            }
744            _ => panic!("Expected ContentBlockDelta event"),
745        }
746    }
747
748    #[test]
749    fn test_signature_delta_streaming_event_deserialization() {
750        let json = r#"{
751            "type": "content_block_delta",
752            "index": 0,
753            "delta": {
754                "type": "signature_delta",
755                "signature": "ErUBCkYICBgCIkCaGbqC85F4"
756            }
757        }"#;
758
759        let event: StreamingEvent = serde_json::from_str(json).unwrap();
760
761        match event {
762            StreamingEvent::ContentBlockDelta { index, delta } => {
763                assert_eq!(index, 0);
764                match delta {
765                    ContentDelta::SignatureDelta { signature } => {
766                        assert_eq!(signature, "ErUBCkYICBgCIkCaGbqC85F4");
767                    }
768                    _ => panic!("Expected SignatureDelta"),
769                }
770            }
771            _ => panic!("Expected ContentBlockDelta event"),
772        }
773    }
774
775    #[test]
776    fn test_handle_thinking_delta_event() {
777        let event = StreamingEvent::ContentBlockDelta {
778            index: 0,
779            delta: ContentDelta::ThinkingDelta {
780                thinking: "Analyzing the request...".to_string(),
781            },
782        };
783
784        let mut tool_call_state = None;
785        let mut thinking_state = None;
786        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
787
788        assert!(result.is_some());
789        let choice = result.unwrap().unwrap();
790
791        match choice {
792            RawStreamingChoice::ReasoningDelta { id, reasoning, .. } => {
793                assert_eq!(id, None);
794                assert_eq!(reasoning, "Analyzing the request...");
795            }
796            _ => panic!("Expected ReasoningDelta choice"),
797        }
798
799        // Verify thinking state was updated
800        assert!(thinking_state.is_some());
801        assert_eq!(thinking_state.unwrap().thinking, "Analyzing the request...");
802    }
803
804    #[test]
805    fn test_handle_signature_delta_event() {
806        let event = StreamingEvent::ContentBlockDelta {
807            index: 0,
808            delta: ContentDelta::SignatureDelta {
809                signature: "test_signature".to_string(),
810            },
811        };
812
813        let mut tool_call_state = None;
814        let mut thinking_state = None;
815        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
816
817        // SignatureDelta should not yield anything (returns None)
818        assert!(result.is_none());
819
820        // But signature should be captured in thinking state
821        assert!(thinking_state.is_some());
822        assert_eq!(thinking_state.unwrap().signature, "test_signature");
823    }
824
825    #[test]
826    fn test_handle_redacted_thinking_content_block_start_event() {
827        let event = StreamingEvent::ContentBlockStart {
828            index: 0,
829            content_block: Content::RedactedThinking {
830                data: "redacted_blob".to_string(),
831            },
832        };
833        let mut tool_call_state = None;
834        let mut thinking_state = None;
835        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
836
837        assert!(result.is_some());
838        match result.unwrap().unwrap() {
839            RawStreamingChoice::Reasoning {
840                content: ReasoningContent::Redacted { data },
841                ..
842            } => {
843                assert_eq!(data, "redacted_blob");
844            }
845            _ => panic!("Expected Redacted reasoning chunk"),
846        }
847    }
848
849    #[test]
850    fn test_handle_text_delta_event() {
851        let event = StreamingEvent::ContentBlockDelta {
852            index: 0,
853            delta: ContentDelta::TextDelta {
854                text: "Hello, world!".to_string(),
855            },
856        };
857
858        let mut tool_call_state = None;
859        let mut thinking_state = None;
860        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
861
862        assert!(result.is_some());
863        let choice = result.unwrap().unwrap();
864
865        match choice {
866            RawStreamingChoice::Message(text) => {
867                assert_eq!(text, "Hello, world!");
868            }
869            _ => panic!("Expected Message choice"),
870        }
871    }
872
873    #[test]
874    fn test_handle_text_block_start_event() {
875        let event = StreamingEvent::ContentBlockStart {
876            index: 0,
877            content_block: Content::Text {
878                text: String::new(),
879                citations: Vec::new(),
880                cache_control: None,
881            },
882        };
883
884        let mut tool_call_state = None;
885        let mut thinking_state = None;
886        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
887
888        assert!(result.is_some());
889        let choice = result.unwrap().unwrap();
890        assert!(matches!(
891            choice,
892            RawStreamingChoice::TextStart {
893                additional_params: None
894            }
895        ));
896    }
897
898    #[test]
899    fn test_thinking_delta_does_not_interfere_with_tool_calls() {
900        // Thinking deltas should still be processed even if a tool call is in progress
901        let event = StreamingEvent::ContentBlockDelta {
902            index: 0,
903            delta: ContentDelta::ThinkingDelta {
904                thinking: "Thinking while tool is active...".to_string(),
905            },
906        };
907
908        let mut tool_call_state = Some(ToolCallState {
909            name: "test_tool".to_string(),
910            id: "tool_123".to_string(),
911            internal_call_id: nanoid::nanoid!(),
912            input_json: String::new(),
913        });
914        let mut thinking_state = None;
915
916        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
917
918        assert!(result.is_some());
919        let choice = result.unwrap().unwrap();
920
921        match choice {
922            RawStreamingChoice::ReasoningDelta { reasoning, .. } => {
923                assert_eq!(reasoning, "Thinking while tool is active...");
924            }
925            _ => panic!("Expected ReasoningDelta choice"),
926        }
927
928        // Tool call state should remain unchanged
929        assert!(tool_call_state.is_some());
930    }
931
932    #[test]
933    fn test_handle_input_json_delta_event() {
934        let event = StreamingEvent::ContentBlockDelta {
935            index: 0,
936            delta: ContentDelta::InputJsonDelta {
937                partial_json: "{\"arg\":\"value".to_string(),
938            },
939        };
940
941        let mut tool_call_state = Some(ToolCallState {
942            name: "test_tool".to_string(),
943            id: "tool_123".to_string(),
944            internal_call_id: nanoid::nanoid!(),
945            input_json: String::new(),
946        });
947        let mut thinking_state = None;
948
949        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
950
951        // Should emit a ToolCallDelta
952        assert!(result.is_some());
953        let choice = result.unwrap().unwrap();
954
955        match choice {
956            RawStreamingChoice::ToolCallDelta {
957                id,
958                internal_call_id: _,
959                content,
960            } => {
961                assert_eq!(id, "tool_123");
962                match content {
963                    ToolCallDeltaContent::Delta(delta) => assert_eq!(delta, "{\"arg\":\"value"),
964                    _ => panic!("Expected Delta content"),
965                }
966            }
967            _ => panic!("Expected ToolCallDelta choice, got {:?}", choice),
968        }
969
970        // Verify the input_json was accumulated
971        assert!(tool_call_state.is_some());
972        let state = tool_call_state.unwrap();
973        assert_eq!(state.input_json, "{\"arg\":\"value");
974    }
975
976    #[test]
977    fn test_tool_call_accumulation_with_multiple_deltas() {
978        let mut tool_call_state = Some(ToolCallState {
979            name: "test_tool".to_string(),
980            id: "tool_123".to_string(),
981            internal_call_id: nanoid::nanoid!(),
982            input_json: String::new(),
983        });
984        let mut thinking_state = None;
985
986        // First delta
987        let event1 = StreamingEvent::ContentBlockDelta {
988            index: 0,
989            delta: ContentDelta::InputJsonDelta {
990                partial_json: "{\"location\":".to_string(),
991            },
992        };
993        let result1 = handle_event(&event1, &mut tool_call_state, &mut thinking_state);
994        assert!(result1.is_some());
995
996        // Second delta
997        let event2 = StreamingEvent::ContentBlockDelta {
998            index: 0,
999            delta: ContentDelta::InputJsonDelta {
1000                partial_json: "\"Paris\",".to_string(),
1001            },
1002        };
1003        let result2 = handle_event(&event2, &mut tool_call_state, &mut thinking_state);
1004        assert!(result2.is_some());
1005
1006        // Third delta
1007        let event3 = StreamingEvent::ContentBlockDelta {
1008            index: 0,
1009            delta: ContentDelta::InputJsonDelta {
1010                partial_json: "\"temp\":\"20C\"}".to_string(),
1011            },
1012        };
1013        let result3 = handle_event(&event3, &mut tool_call_state, &mut thinking_state);
1014        assert!(result3.is_some());
1015
1016        // Verify accumulated JSON
1017        assert!(tool_call_state.is_some());
1018        let state = tool_call_state.as_ref().unwrap();
1019        assert_eq!(
1020            state.input_json,
1021            "{\"location\":\"Paris\",\"temp\":\"20C\"}"
1022        );
1023
1024        // Final ContentBlockStop should emit complete tool call
1025        let stop_event = StreamingEvent::ContentBlockStop { index: 0 };
1026        let final_result = handle_event(&stop_event, &mut tool_call_state, &mut thinking_state);
1027        assert!(final_result.is_some());
1028
1029        match final_result.unwrap().unwrap() {
1030            RawStreamingChoice::ToolCall(RawStreamingToolCall {
1031                id,
1032                name,
1033                arguments,
1034                ..
1035            }) => {
1036                assert_eq!(id, "tool_123");
1037                assert_eq!(name, "test_tool");
1038                assert_eq!(
1039                    arguments.get("location").unwrap().as_str().unwrap(),
1040                    "Paris"
1041                );
1042                assert_eq!(arguments.get("temp").unwrap().as_str().unwrap(), "20C");
1043            }
1044            other => panic!("Expected ToolCall, got {:?}", other),
1045        }
1046
1047        // Tool call state should be taken
1048        assert!(tool_call_state.is_none());
1049    }
1050
1051    #[test]
1052    fn test_citations_delta_streaming_event_deserialization() {
1053        let json = r#"{
1054            "type": "content_block_delta",
1055            "index": 0,
1056            "delta": {
1057                "type": "citations_delta",
1058                "citation": {
1059                    "type": "char_location",
1060                    "cited_text": "The grass is green.",
1061                    "document_index": 0,
1062                    "document_title": "Example",
1063                    "start_char_index": 0,
1064                    "end_char_index": 20
1065                }
1066            }
1067        }"#;
1068
1069        let event: StreamingEvent = serde_json::from_str(json).unwrap();
1070        let StreamingEvent::ContentBlockDelta { index, delta } = event else {
1071            panic!("expected ContentBlockDelta");
1072        };
1073        assert_eq!(index, 0);
1074        let ContentDelta::CitationsDelta { citation } = delta else {
1075            panic!("expected CitationsDelta");
1076        };
1077        let crate::providers::anthropic::completion::Citation::CharLocation {
1078            start_char_index,
1079            end_char_index,
1080            ..
1081        } = citation
1082        else {
1083            panic!("expected CharLocation");
1084        };
1085        assert_eq!(start_char_index, 0);
1086        assert_eq!(end_char_index, 20);
1087    }
1088
1089    #[test]
1090    fn test_search_result_citations_delta_streaming_event_deserialization() {
1091        let json = r#"{
1092            "type": "content_block_delta",
1093            "index": 0,
1094            "delta": {
1095                "type": "citations_delta",
1096                "citation": {
1097                    "type": "search_result_location",
1098                    "cited_text": "API requests require a key.",
1099                    "source": "https://docs.example.com/api-reference",
1100                    "title": "API Reference",
1101                    "search_result_index": 0,
1102                    "start_block_index": 0,
1103                    "end_block_index": 1
1104                }
1105            }
1106        }"#;
1107
1108        let event: StreamingEvent = serde_json::from_str(json).unwrap();
1109        let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1110            panic!("expected ContentBlockDelta");
1111        };
1112        let ContentDelta::CitationsDelta { citation } = delta else {
1113            panic!("expected CitationsDelta");
1114        };
1115        assert!(matches!(
1116            citation,
1117            crate::providers::anthropic::completion::Citation::SearchResultLocation {
1118                search_result_index: 0,
1119                start_block_index: 0,
1120                end_block_index: 1,
1121                ..
1122            }
1123        ));
1124    }
1125
1126    #[test]
1127    fn test_web_search_result_citations_delta_streaming_event_deserialization() {
1128        let json = r#"{
1129            "type": "content_block_delta",
1130            "index": 0,
1131            "delta": {
1132                "type": "citations_delta",
1133                "citation": {
1134                    "type": "web_search_result_location",
1135                    "cited_text": "Claude Shannon was a mathematician.",
1136                    "url": "https://example.com/shannon",
1137                    "title": "Claude Shannon",
1138                    "encrypted_index": "encrypted-reference"
1139                }
1140            }
1141        }"#;
1142
1143        let event: StreamingEvent = serde_json::from_str(json).unwrap();
1144        let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1145            panic!("expected ContentBlockDelta");
1146        };
1147        let ContentDelta::CitationsDelta { citation } = delta else {
1148            panic!("expected CitationsDelta");
1149        };
1150        assert!(matches!(
1151            citation,
1152            crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1153                ref url,
1154                ref encrypted_index,
1155                ..
1156            } if url == "https://example.com/shannon"
1157                && encrypted_index == "encrypted-reference"
1158        ));
1159    }
1160
1161    #[test]
1162    fn test_web_search_result_citations_delta_allows_null_title() {
1163        let json = r#"{
1164            "type": "content_block_delta",
1165            "index": 0,
1166            "delta": {
1167                "type": "citations_delta",
1168                "citation": {
1169                    "type": "web_search_result_location",
1170                    "cited_text": "Claude Shannon was a mathematician.",
1171                    "url": "https://example.com/shannon",
1172                    "title": null,
1173                    "encrypted_index": "encrypted-reference"
1174                }
1175            }
1176        }"#;
1177
1178        let event: StreamingEvent = serde_json::from_str(json).unwrap();
1179        let StreamingEvent::ContentBlockDelta { delta, .. } = event else {
1180            panic!("expected ContentBlockDelta");
1181        };
1182        let ContentDelta::CitationsDelta { citation } = delta else {
1183            panic!("expected CitationsDelta");
1184        };
1185        assert!(matches!(
1186            citation,
1187            crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1188                title: None,
1189                ..
1190            }
1191        ));
1192    }
1193
1194    #[test]
1195    fn test_web_search_content_block_start_events_deserialize() {
1196        let server_tool_use = r#"{
1197            "type": "content_block_start",
1198            "index": 1,
1199            "content_block": {
1200                "type": "server_tool_use",
1201                "id": "srvtoolu_01",
1202                "name": "web_search",
1203                "input": {
1204                    "query": "claude shannon birth date"
1205                }
1206            }
1207        }"#;
1208        let event: StreamingEvent = serde_json::from_str(server_tool_use).unwrap();
1209        assert!(matches!(
1210            event,
1211            StreamingEvent::ContentBlockStart {
1212                content_block: Content::ServerToolUse {
1213                    ref id,
1214                    ref name,
1215                    ref input
1216                },
1217                ..
1218            } if id == "srvtoolu_01"
1219                && name == "web_search"
1220                && input["query"] == "claude shannon birth date"
1221        ));
1222
1223        let web_search_tool_result = r#"{
1224            "type": "content_block_start",
1225            "index": 2,
1226            "content_block": {
1227                "type": "web_search_tool_result",
1228                "tool_use_id": "srvtoolu_01",
1229                "content": [{
1230                    "type": "web_search_result",
1231                    "url": "https://example.com/shannon",
1232                    "title": "Claude Shannon",
1233                    "encrypted_content": "encrypted-content"
1234                }]
1235            }
1236        }"#;
1237        let event: StreamingEvent = serde_json::from_str(web_search_tool_result).unwrap();
1238        assert!(matches!(
1239            event,
1240            StreamingEvent::ContentBlockStart {
1241                content_block: Content::WebSearchToolResult {
1242                    ref tool_use_id,
1243                    ref content
1244                },
1245                ..
1246            } if tool_use_id == "srvtoolu_01"
1247                && content[0]["encrypted_content"] == "encrypted-content"
1248        ));
1249    }
1250
1251    #[tokio::test]
1252    async fn test_streaming_web_search_blocks_are_preserved_on_final_choice() {
1253        let raw_stream = stream! {
1254            let mut tool_call_state = None;
1255            let mut server_tool_uses = HashMap::new();
1256            let mut thinking_state = None;
1257
1258            let server_tool_use_start = super::handle_event(
1259                &StreamingEvent::ContentBlockStart {
1260                    index: 0,
1261                    content_block: Content::ServerToolUse {
1262                        id: "srvtoolu_01".to_string(),
1263                        name: "web_search".to_string(),
1264                        input: serde_json::Value::Null,
1265                    },
1266                },
1267                &mut tool_call_state,
1268                &mut server_tool_uses,
1269                &mut thinking_state,
1270            );
1271            assert!(
1272                server_tool_use_start.is_none(),
1273                "server_tool_use start should be accumulated until its input JSON is complete"
1274            );
1275
1276            let server_tool_use_delta = super::handle_event(
1277                &StreamingEvent::ContentBlockDelta {
1278                    index: 0,
1279                    delta: ContentDelta::InputJsonDelta {
1280                        partial_json: r#"{"query":"claude shannon birth date"}"#.to_string(),
1281                    },
1282                },
1283                &mut tool_call_state,
1284                &mut server_tool_uses,
1285                &mut thinking_state,
1286            );
1287            assert!(
1288                server_tool_use_delta.is_none(),
1289                "server_tool_use input JSON should not be emitted as a Rig tool-call delta"
1290            );
1291
1292            yield super::handle_event(
1293                &StreamingEvent::ContentBlockStop { index: 0 },
1294                &mut tool_call_state,
1295                &mut server_tool_uses,
1296                &mut thinking_state,
1297            )
1298            .expect("server_tool_use stop should produce completed raw metadata");
1299
1300            yield super::handle_event(
1301                &StreamingEvent::ContentBlockStart {
1302                    index: 1,
1303                    content_block: Content::WebSearchToolResult {
1304                        tool_use_id: "srvtoolu_01".to_string(),
1305                        content: serde_json::json!([{
1306                            "type": "web_search_result",
1307                            "url": "https://example.com/shannon",
1308                            "title": "Claude Shannon",
1309                            "encrypted_content": "encrypted-content"
1310                        }]),
1311                    },
1312                },
1313                &mut tool_call_state,
1314                &mut server_tool_uses,
1315                &mut thinking_state,
1316            )
1317            .expect("web_search_tool_result block should produce raw metadata");
1318
1319            yield super::handle_event(
1320                &StreamingEvent::ContentBlockStart {
1321                    index: 2,
1322                    content_block: Content::Text {
1323                        text: String::new(),
1324                        citations: Vec::new(),
1325                        cache_control: None,
1326                    },
1327                },
1328                &mut tool_call_state,
1329                &mut server_tool_uses,
1330                &mut thinking_state,
1331            )
1332            .expect("text block start should produce a raw choice");
1333
1334            yield super::handle_event(
1335                &StreamingEvent::ContentBlockDelta {
1336                    index: 2,
1337                    delta: ContentDelta::TextDelta {
1338                        text: "Claude Shannon was born on April 30, 1916.".to_string(),
1339                    },
1340                },
1341                &mut tool_call_state,
1342                &mut server_tool_uses,
1343                &mut thinking_state,
1344            )
1345            .expect("text delta should produce a raw choice");
1346
1347            yield super::handle_event(
1348                &StreamingEvent::ContentBlockDelta {
1349                    index: 2,
1350                    delta: ContentDelta::CitationsDelta {
1351                        citation: crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1352                            cited_text: "Claude Shannon was born on April 30, 1916.".to_string(),
1353                            url: "https://example.com/shannon".to_string(),
1354                            title: Some("Claude Shannon".to_string()),
1355                            encrypted_index: "encrypted-index".to_string(),
1356                        },
1357                    },
1358                },
1359                &mut tool_call_state,
1360                &mut server_tool_uses,
1361                &mut thinking_state,
1362            )
1363            .expect("citation delta should produce a raw choice");
1364
1365            yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
1366                usage: PartialUsage::default(),
1367            }));
1368        };
1369
1370        let mut stream =
1371            crate::streaming::StreamingCompletionResponse::stream(to_stream_result(raw_stream));
1372        while stream.next().await.is_some() {}
1373
1374        let choice_items: Vec<crate::message::AssistantContent> =
1375            stream.choice.clone().into_iter().collect();
1376        assert_eq!(choice_items.len(), 3);
1377        assert!(
1378            choice_items
1379                .iter()
1380                .all(|item| !matches!(item, crate::message::AssistantContent::ToolCall(_))),
1381            "provider-owned web-search blocks must not become Rig client tool calls"
1382        );
1383
1384        let Some(crate::message::AssistantContent::Text(server_tool_use)) = choice_items.first()
1385        else {
1386            panic!("expected raw server_tool_use metadata");
1387        };
1388        assert_eq!(
1389            server_tool_use.additional_params.as_ref().unwrap()
1390                [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["type"],
1391            "server_tool_use"
1392        );
1393        assert_eq!(
1394            server_tool_use.additional_params.as_ref().unwrap()
1395                [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["input"]["query"],
1396            "claude shannon birth date"
1397        );
1398
1399        let Some(crate::message::AssistantContent::Text(web_search_result)) = choice_items.get(1)
1400        else {
1401            panic!("expected raw web_search_tool_result metadata");
1402        };
1403        assert_eq!(
1404            web_search_result.additional_params.as_ref().unwrap()
1405                [crate::providers::anthropic::completion::ANTHROPIC_RAW_CONTENT_KEY]["content"][0]
1406                ["encrypted_content"],
1407            "encrypted-content"
1408        );
1409
1410        let Some(crate::message::AssistantContent::Text(answer)) = choice_items.get(2) else {
1411            panic!("expected answer text");
1412        };
1413        assert_eq!(answer.text, "Claude Shannon was born on April 30, 1916.");
1414        let citations = crate::providers::anthropic::completion::anthropic_citations(answer)
1415            .expect("expected preserved citations");
1416        assert!(matches!(
1417            citations.first(),
1418            Some(crate::providers::anthropic::completion::Citation::WebSearchResultLocation {
1419                encrypted_index,
1420                ..
1421            }) if encrypted_index == "encrypted-index"
1422        ));
1423    }
1424
1425    #[test]
1426    fn test_handle_citations_delta_event_preserves_metadata() {
1427        let event = StreamingEvent::ContentBlockDelta {
1428            index: 0,
1429            delta: ContentDelta::CitationsDelta {
1430                citation: crate::providers::anthropic::completion::Citation::CharLocation {
1431                    cited_text: "The grass is green.".to_string(),
1432                    document_index: 0,
1433                    document_title: Some("Example".to_string()),
1434                    start_char_index: 0,
1435                    end_char_index: 20,
1436                },
1437            },
1438        };
1439
1440        let mut tool_call_state = None;
1441        let mut thinking_state = None;
1442        let result = handle_event(&event, &mut tool_call_state, &mut thinking_state);
1443
1444        assert!(result.is_some());
1445        let choice = result.unwrap().unwrap();
1446        let RawStreamingChoice::TextAdditionalParams(additional_params) = choice else {
1447            panic!("expected TextAdditionalParams choice");
1448        };
1449        assert_eq!(additional_params["citations"][0]["type"], "char_location");
1450    }
1451
1452    #[tokio::test]
1453    async fn test_streaming_citation_deltas_are_preserved_on_final_text() {
1454        let citation = crate::providers::anthropic::completion::Citation::CharLocation {
1455            cited_text: "The grass is green.".to_string(),
1456            document_index: 0,
1457            document_title: Some("Example".to_string()),
1458            start_char_index: 0,
1459            end_char_index: 20,
1460        };
1461
1462        let raw_stream = stream! {
1463            let mut tool_call_state = None;
1464            let mut thinking_state = None;
1465
1466            yield handle_event(
1467                &StreamingEvent::ContentBlockStart {
1468                    index: 0,
1469                    content_block: Content::Text {
1470                        text: String::new(),
1471                        citations: Vec::new(),
1472                        cache_control: None,
1473                    },
1474                },
1475                &mut tool_call_state,
1476                &mut thinking_state,
1477            )
1478            .expect("text block start should produce a raw choice");
1479
1480            yield handle_event(
1481                &StreamingEvent::ContentBlockDelta {
1482                    index: 0,
1483                    delta: ContentDelta::TextDelta {
1484                        text: "the grass is green".to_string(),
1485                    },
1486                },
1487                &mut tool_call_state,
1488                &mut thinking_state,
1489            )
1490            .expect("text delta should produce a raw choice");
1491
1492            yield handle_event(
1493                &StreamingEvent::ContentBlockDelta {
1494                    index: 0,
1495                    delta: ContentDelta::CitationsDelta {
1496                        citation: crate::providers::anthropic::completion::Citation::CharLocation {
1497                            cited_text: "The grass is green.".to_string(),
1498                            document_index: 0,
1499                            document_title: Some("Example".to_string()),
1500                            start_char_index: 0,
1501                            end_char_index: 20,
1502                        },
1503                    },
1504                },
1505                &mut tool_call_state,
1506                &mut thinking_state,
1507            )
1508            .expect("citation delta should produce a raw choice");
1509
1510            yield Ok(RawStreamingChoice::FinalResponse(StreamingCompletionResponse {
1511                usage: PartialUsage::default(),
1512            }));
1513        };
1514
1515        let mut stream =
1516            crate::streaming::StreamingCompletionResponse::stream(to_stream_result(raw_stream));
1517        while stream.next().await.is_some() {}
1518
1519        let choice_items: Vec<crate::message::AssistantContent> =
1520            stream.choice.clone().into_iter().collect();
1521        let Some(crate::message::AssistantContent::Text(text)) = choice_items.first() else {
1522            panic!("expected accumulated text item");
1523        };
1524
1525        assert_eq!(text.text, "the grass is green");
1526        let citations = crate::providers::anthropic::completion::anthropic_citations(text).unwrap();
1527        assert_eq!(citations, vec![citation]);
1528    }
1529
1530    #[test]
1531    fn test_unknown_content_delta_falls_back() {
1532        let json = r#"{"type": "something_new_from_anthropic", "field": "x"}"#;
1533        let delta: ContentDelta = serde_json::from_str(json).unwrap();
1534        assert!(matches!(delta, ContentDelta::Unknown));
1535    }
1536}