genai_rs/
wire_streaming.rs

1//! Streaming types for SSE responses.
2
3use serde::{Deserialize, Serialize};
4
5use crate::content::Content;
6use crate::response::{InteractionResponse, InteractionStatus};
7
8/// A chunk from the streaming API
9///
10/// During streaming, the API sends different types of events:
11/// - `Start`: Initial interaction event (first event, contains ID)
12/// - `StatusUpdate`: Status changes during processing
13/// - `ContentStart`: Content generation begins for an output
14/// - `Delta`: Incremental content updates (text, thought, function_call, etc.)
15/// - `ContentStop`: Content generation ends for an output
16/// - `Complete`: The final complete interaction response
17/// - `Error`: Error occurred during streaming
18///
19/// All variants implement `Serialize` and `Deserialize` for logging,
20/// persistence, and replay of streaming events.
21///
22/// # Forward Compatibility
23///
24/// This enum uses `#[non_exhaustive]` to allow adding new chunk types in future
25/// versions without breaking existing code. Always include a wildcard arm in
26/// match statements. Unknown chunk types deserialize to the `Unknown` variant
27/// with their data preserved.
28#[derive(Clone, Debug)]
29#[non_exhaustive]
30#[allow(clippy::large_enum_variant)]
31pub enum StreamChunk {
32    /// Interaction started (first event, contains ID).
33    ///
34    /// Sent when the interaction is accepted by the API. Provides early access
35    /// to the interaction ID before any content is generated.
36    Start {
37        /// The full interaction response at start time
38        interaction: InteractionResponse,
39    },
40
41    /// Status update for in-progress interaction.
42    ///
43    /// Sent when the interaction status changes during processing.
44    /// Useful for tracking progress of background/agent interactions.
45    StatusUpdate {
46        /// The interaction ID
47        interaction_id: String,
48        /// The updated status
49        status: InteractionStatus,
50    },
51
52    /// Content generation started for an output.
53    ///
54    /// Sent when a new content block begins generation.
55    /// The `index` indicates which output position this content will occupy.
56    ContentStart {
57        /// Position index for this content block
58        index: usize,
59        /// The content type being started (e.g., "text", "thought")
60        content_type: Option<String>,
61    },
62
63    /// Incremental content update
64    Delta(Content),
65
66    /// Content generation stopped for an output.
67    ///
68    /// Sent when a content block finishes generation.
69    ContentStop {
70        /// Position index for the completed content block
71        index: usize,
72    },
73
74    /// Complete interaction response (final event)
75    Complete(InteractionResponse),
76
77    /// Error occurred during streaming.
78    ///
79    /// Indicates a terminal error condition. The stream will end after this event.
80    Error {
81        /// Human-readable error message
82        message: String,
83        /// Error code from the API (if provided)
84        code: Option<String>,
85    },
86
87    /// Unknown chunk type (for forward compatibility).
88    ///
89    /// This variant is used when deserializing JSON that contains an unrecognized
90    /// `chunk_type`. This allows the library to gracefully handle new chunk types
91    /// added by the API in future versions without failing deserialization.
92    ///
93    /// The `chunk_type` field contains the unrecognized type string, and `data`
94    /// contains the full JSON data for inspection or debugging.
95    Unknown {
96        /// The unrecognized chunk type from the API
97        chunk_type: String,
98        /// The raw JSON data, preserved for debugging and roundtrip serialization
99        data: serde_json::Value,
100    },
101}
102
103impl StreamChunk {
104    /// Check if this is an unknown chunk type.
105    #[must_use]
106    pub const fn is_unknown(&self) -> bool {
107        matches!(self, Self::Unknown { .. })
108    }
109
110    /// Returns the chunk type name if this is an unknown chunk type.
111    ///
112    /// Returns `None` for known chunk types.
113    #[must_use]
114    pub fn unknown_chunk_type(&self) -> Option<&str> {
115        match self {
116            Self::Unknown { chunk_type, .. } => Some(chunk_type),
117            _ => None,
118        }
119    }
120
121    /// Returns the raw JSON data if this is an unknown chunk type.
122    ///
123    /// Returns `None` for known chunk types.
124    #[must_use]
125    pub fn unknown_data(&self) -> Option<&serde_json::Value> {
126        match self {
127            Self::Unknown { data, .. } => Some(data),
128            _ => None,
129        }
130    }
131
132    /// Returns the interaction ID if this event contains one.
133    ///
134    /// Available for `Start`, `StatusUpdate`, and `Complete` variants.
135    ///
136    /// # Example
137    ///
138    /// ```no_run
139    /// # use genai_rs::StreamChunk;
140    /// # fn example(chunk: StreamChunk) {
141    /// if let Some(id) = chunk.interaction_id() {
142    ///     println!("Interaction ID: {}", id);
143    /// }
144    /// # }
145    /// ```
146    #[must_use]
147    pub fn interaction_id(&self) -> Option<&str> {
148        match self {
149            Self::Start { interaction } => interaction.id.as_deref(),
150            Self::StatusUpdate { interaction_id, .. } => Some(interaction_id),
151            Self::Complete(response) => response.id.as_deref(),
152            _ => None,
153        }
154    }
155
156    /// Returns true if this is a terminal event.
157    ///
158    /// Terminal events indicate the stream has ended (either successfully or with an error).
159    /// After receiving a terminal event, no more events will be sent.
160    ///
161    /// Terminal events are:
162    /// - `Complete`: Successful completion
163    /// - `Error`: Error occurred
164    ///
165    /// # Example
166    ///
167    /// ```no_run
168    /// # use genai_rs::StreamChunk;
169    /// # fn example(chunk: StreamChunk) {
170    /// if chunk.is_terminal() {
171    ///     println!("Stream has ended");
172    /// }
173    /// # }
174    /// ```
175    #[must_use]
176    pub const fn is_terminal(&self) -> bool {
177        matches!(self, Self::Complete(_) | Self::Error { .. })
178    }
179
180    /// Returns the status if this event contains one.
181    ///
182    /// Available for `StatusUpdate` and `Complete` variants.
183    ///
184    /// # Example
185    ///
186    /// ```no_run
187    /// # use genai_rs::StreamChunk;
188    /// # fn example(chunk: StreamChunk) {
189    /// if let Some(status) = chunk.status() {
190    ///     println!("Status: {:?}", status);
191    /// }
192    /// # }
193    /// ```
194    #[must_use]
195    pub fn status(&self) -> Option<&InteractionStatus> {
196        match self {
197            Self::StatusUpdate { status, .. } => Some(status),
198            Self::Complete(response) => Some(&response.status),
199            _ => None,
200        }
201    }
202}
203
204impl Serialize for StreamChunk {
205    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
206    where
207        S: serde::Serializer,
208    {
209        use serde::ser::SerializeMap;
210
211        match self {
212            Self::Start { interaction } => {
213                let mut map = serializer.serialize_map(None)?;
214                map.serialize_entry("chunk_type", "start")?;
215                map.serialize_entry("data", interaction)?;
216                map.end()
217            }
218            Self::StatusUpdate {
219                interaction_id,
220                status,
221            } => {
222                let mut map = serializer.serialize_map(None)?;
223                map.serialize_entry("chunk_type", "status_update")?;
224                map.serialize_entry(
225                    "data",
226                    &serde_json::json!({
227                        "interaction_id": interaction_id,
228                        "status": status,
229                    }),
230                )?;
231                map.end()
232            }
233            Self::ContentStart {
234                index,
235                content_type,
236            } => {
237                let mut map = serializer.serialize_map(None)?;
238                map.serialize_entry("chunk_type", "content_start")?;
239                map.serialize_entry(
240                    "data",
241                    &serde_json::json!({
242                        "index": index,
243                        "content_type": content_type,
244                    }),
245                )?;
246                map.end()
247            }
248            Self::Delta(content) => {
249                let mut map = serializer.serialize_map(None)?;
250                map.serialize_entry("chunk_type", "delta")?;
251                map.serialize_entry("data", content)?;
252                map.end()
253            }
254            Self::ContentStop { index } => {
255                let mut map = serializer.serialize_map(None)?;
256                map.serialize_entry("chunk_type", "content_stop")?;
257                map.serialize_entry("data", &serde_json::json!({ "index": index }))?;
258                map.end()
259            }
260            Self::Complete(response) => {
261                let mut map = serializer.serialize_map(None)?;
262                map.serialize_entry("chunk_type", "complete")?;
263                map.serialize_entry("data", response)?;
264                map.end()
265            }
266            Self::Error { message, code } => {
267                let mut map = serializer.serialize_map(None)?;
268                map.serialize_entry("chunk_type", "error")?;
269                map.serialize_entry(
270                    "data",
271                    &serde_json::json!({
272                        "message": message,
273                        "code": code,
274                    }),
275                )?;
276                map.end()
277            }
278            Self::Unknown { chunk_type, data } => {
279                let mut map = serializer.serialize_map(None)?;
280                map.serialize_entry("chunk_type", chunk_type)?;
281                if !data.is_null() {
282                    map.serialize_entry("data", data)?;
283                }
284                map.end()
285            }
286        }
287    }
288}
289
290impl<'de> Deserialize<'de> for StreamChunk {
291    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
292    where
293        D: serde::Deserializer<'de>,
294    {
295        let value = serde_json::Value::deserialize(deserializer)?;
296
297        let chunk_type = match value.get("chunk_type") {
298            Some(serde_json::Value::String(s)) => s.as_str(),
299            Some(other) => {
300                tracing::warn!(
301                    "StreamChunk received non-string chunk_type: {}. \
302                     This may indicate a malformed API response.",
303                    other
304                );
305                "<non-string chunk_type>"
306            }
307            None => {
308                tracing::warn!(
309                    "StreamChunk is missing required chunk_type field. \
310                     This may indicate a malformed API response."
311                );
312                "<missing chunk_type>"
313            }
314        };
315
316        match chunk_type {
317            "start" => {
318                let data = match value.get("data").cloned() {
319                    Some(d) => d,
320                    None => {
321                        tracing::warn!(
322                            "StreamChunk::Start is missing the 'data' field. \
323                             This may indicate a malformed API response."
324                        );
325                        serde_json::Value::Null
326                    }
327                };
328                let interaction: InteractionResponse =
329                    serde_json::from_value(data).map_err(|e| {
330                        serde::de::Error::custom(format!(
331                            "Failed to deserialize StreamChunk::Start data: {}",
332                            e
333                        ))
334                    })?;
335                Ok(Self::Start { interaction })
336            }
337            "status_update" => {
338                let data = value
339                    .get("data")
340                    .cloned()
341                    .unwrap_or(serde_json::Value::Null);
342                let interaction_id = data
343                    .get("interaction_id")
344                    .and_then(|v| v.as_str())
345                    .map(String::from)
346                    .unwrap_or_else(|| {
347                        tracing::warn!(
348                            "StreamChunk::StatusUpdate is missing interaction_id. \
349                             This may indicate a malformed API response."
350                        );
351                        String::new()
352                    });
353                let status: InteractionStatus = data
354                    .get("status")
355                    .cloned()
356                    .map(serde_json::from_value)
357                    .transpose()
358                    .map_err(|e| {
359                        serde::de::Error::custom(format!(
360                            "Failed to deserialize StreamChunk::StatusUpdate status: {}",
361                            e
362                        ))
363                    })?
364                    .unwrap_or_else(|| {
365                        tracing::warn!(
366                            "StreamChunk::StatusUpdate is missing status. \
367                             This may indicate a malformed API response."
368                        );
369                        InteractionStatus::InProgress
370                    });
371                Ok(Self::StatusUpdate {
372                    interaction_id,
373                    status,
374                })
375            }
376            "content_start" => {
377                let data = value
378                    .get("data")
379                    .cloned()
380                    .unwrap_or(serde_json::Value::Null);
381                let index = data
382                    .get("index")
383                    .and_then(|v| v.as_u64())
384                    .map(|v| v as usize)
385                    .unwrap_or_else(|| {
386                        tracing::warn!(
387                            "StreamChunk::ContentStart is missing index. \
388                             This may indicate a malformed API response."
389                        );
390                        0
391                    });
392                let content_type = data
393                    .get("content_type")
394                    .and_then(|v| v.as_str())
395                    .map(String::from);
396                Ok(Self::ContentStart {
397                    index,
398                    content_type,
399                })
400            }
401            "delta" => {
402                let data = match value.get("data").cloned() {
403                    Some(d) => d,
404                    None => {
405                        tracing::warn!(
406                            "StreamChunk::Delta is missing the 'data' field. \
407                             This may indicate a malformed API response."
408                        );
409                        serde_json::Value::Null
410                    }
411                };
412                let content: Content = serde_json::from_value(data).map_err(|e| {
413                    serde::de::Error::custom(format!(
414                        "Failed to deserialize StreamChunk::Delta data: {}",
415                        e
416                    ))
417                })?;
418                Ok(Self::Delta(content))
419            }
420            "content_stop" => {
421                let data = value
422                    .get("data")
423                    .cloned()
424                    .unwrap_or(serde_json::Value::Null);
425                let index = data
426                    .get("index")
427                    .and_then(|v| v.as_u64())
428                    .map(|v| v as usize)
429                    .unwrap_or_else(|| {
430                        tracing::warn!(
431                            "StreamChunk::ContentStop is missing index. \
432                             This may indicate a malformed API response."
433                        );
434                        0
435                    });
436                Ok(Self::ContentStop { index })
437            }
438            "complete" => {
439                let data = match value.get("data").cloned() {
440                    Some(d) => d,
441                    None => {
442                        tracing::warn!(
443                            "StreamChunk::Complete is missing the 'data' field. \
444                             This may indicate a malformed API response."
445                        );
446                        serde_json::Value::Null
447                    }
448                };
449                let response: InteractionResponse = serde_json::from_value(data).map_err(|e| {
450                    serde::de::Error::custom(format!(
451                        "Failed to deserialize StreamChunk::Complete data: {}",
452                        e
453                    ))
454                })?;
455                Ok(Self::Complete(response))
456            }
457            "error" => {
458                let data = value
459                    .get("data")
460                    .cloned()
461                    .unwrap_or(serde_json::Value::Null);
462                let message = data
463                    .get("message")
464                    .and_then(|v| v.as_str())
465                    .map(String::from)
466                    .unwrap_or_else(|| {
467                        tracing::warn!(
468                            "StreamChunk::Error is missing message. \
469                             This may indicate a malformed API response."
470                        );
471                        "Unknown error".to_string()
472                    });
473                let code = data.get("code").and_then(|v| v.as_str()).map(String::from);
474                Ok(Self::Error { message, code })
475            }
476            other => {
477                tracing::warn!(
478                    "Encountered unknown StreamChunk type '{}'. \
479                     This may indicate a new API feature. \
480                     The chunk will be preserved in the Unknown variant.",
481                    other
482                );
483                let data = value
484                    .get("data")
485                    .cloned()
486                    .unwrap_or(serde_json::Value::Null);
487                Ok(Self::Unknown {
488                    chunk_type: other.to_string(),
489                    data,
490                })
491            }
492        }
493    }
494}
495
496/// A streaming event with position metadata for resume support.
497///
498/// This wrapper pairs a [`StreamChunk`] with its `event_id`, enabling stream resumption
499/// after network interruptions. To resume a stream, pass the `event_id` from the last
500/// successfully received event to resume the stream.
501///
502/// # Example
503///
504/// ```ignore
505/// let mut last_event_id = None;
506/// let mut stream = client.interaction().with_model("gemini-3-flash-preview")
507///     .with_text("Count to 100").create_stream();
508///
509/// while let Some(result) = stream.next().await {
510///     let event = result?;
511///     last_event_id = event.event_id.clone();  // Track for resume
512///     match event.chunk {
513///         StreamChunk::Delta(content) => { /* process */ }
514///         StreamChunk::Complete(response) => { /* done */ }
515///         _ => {}
516///     }
517/// }
518///
519/// // If interrupted, resume from last_event_id:
520/// let resumed_stream = client.get_interaction_stream(&interaction_id, last_event_id.as_deref());
521/// ```
522#[derive(Clone, Debug)]
523#[non_exhaustive]
524pub struct StreamEvent {
525    /// The chunk content (Delta, Complete, or Unknown).
526    pub chunk: StreamChunk,
527
528    /// Event ID for stream resumption.
529    ///
530    /// Pass this to `last_event_id` when calling `get_interaction_stream()` to resume
531    /// the stream from this point. Events are ordered, so resuming from an event_id
532    /// will replay all subsequent events.
533    pub event_id: Option<String>,
534}
535
536impl StreamEvent {
537    /// Creates a new StreamEvent with the given chunk and event_id.
538    #[must_use]
539    pub fn new(chunk: StreamChunk, event_id: Option<String>) -> Self {
540        Self { chunk, event_id }
541    }
542
543    /// Returns `true` if the chunk is a Delta variant.
544    #[must_use]
545    pub const fn is_delta(&self) -> bool {
546        matches!(self.chunk, StreamChunk::Delta(_))
547    }
548
549    /// Returns `true` if the chunk is a Complete variant.
550    #[must_use]
551    pub const fn is_complete(&self) -> bool {
552        matches!(self.chunk, StreamChunk::Complete(_))
553    }
554
555    /// Returns `true` if the chunk is an Unknown variant.
556    #[must_use]
557    pub const fn is_unknown(&self) -> bool {
558        self.chunk.is_unknown()
559    }
560
561    /// Returns `true` if the chunk is a terminal event (Complete or Error).
562    #[must_use]
563    pub const fn is_terminal(&self) -> bool {
564        self.chunk.is_terminal()
565    }
566
567    /// Returns the interaction ID from the chunk, if available.
568    #[must_use]
569    pub fn interaction_id(&self) -> Option<&str> {
570        self.chunk.interaction_id()
571    }
572
573    /// Returns the status from the chunk, if available.
574    #[must_use]
575    pub fn status(&self) -> Option<&InteractionStatus> {
576        self.chunk.status()
577    }
578
579    /// Returns the unrecognized chunk type if this is an Unknown variant.
580    #[must_use]
581    pub fn unknown_chunk_type(&self) -> Option<&str> {
582        self.chunk.unknown_chunk_type()
583    }
584
585    /// Returns the preserved JSON data if this is an Unknown variant.
586    #[must_use]
587    pub fn unknown_data(&self) -> Option<&serde_json::Value> {
588        self.chunk.unknown_data()
589    }
590}
591
592impl Serialize for StreamEvent {
593    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
594    where
595        S: serde::Serializer,
596    {
597        use serde::ser::SerializeMap;
598
599        // Serialize as a map containing the chunk fields plus event_id
600        let mut map = serializer.serialize_map(None)?;
601
602        // First serialize the chunk's fields (delegate to StreamChunk's logic)
603        match &self.chunk {
604            StreamChunk::Start { interaction } => {
605                map.serialize_entry("chunk_type", "start")?;
606                map.serialize_entry("data", interaction)?;
607            }
608            StreamChunk::StatusUpdate {
609                interaction_id,
610                status,
611            } => {
612                map.serialize_entry("chunk_type", "status_update")?;
613                map.serialize_entry(
614                    "data",
615                    &serde_json::json!({
616                        "interaction_id": interaction_id,
617                        "status": status,
618                    }),
619                )?;
620            }
621            StreamChunk::ContentStart {
622                index,
623                content_type,
624            } => {
625                map.serialize_entry("chunk_type", "content_start")?;
626                map.serialize_entry(
627                    "data",
628                    &serde_json::json!({
629                        "index": index,
630                        "content_type": content_type,
631                    }),
632                )?;
633            }
634            StreamChunk::Delta(content) => {
635                map.serialize_entry("chunk_type", "delta")?;
636                map.serialize_entry("data", content)?;
637            }
638            StreamChunk::ContentStop { index } => {
639                map.serialize_entry("chunk_type", "content_stop")?;
640                map.serialize_entry("data", &serde_json::json!({ "index": index }))?;
641            }
642            StreamChunk::Complete(response) => {
643                map.serialize_entry("chunk_type", "complete")?;
644                map.serialize_entry("data", response)?;
645            }
646            StreamChunk::Error { message, code } => {
647                map.serialize_entry("chunk_type", "error")?;
648                map.serialize_entry(
649                    "data",
650                    &serde_json::json!({
651                        "message": message,
652                        "code": code,
653                    }),
654                )?;
655            }
656            StreamChunk::Unknown { chunk_type, data } => {
657                map.serialize_entry("chunk_type", chunk_type)?;
658                if !data.is_null() {
659                    map.serialize_entry("data", data)?;
660                }
661            }
662        }
663
664        // Add event_id if present
665        if let Some(event_id) = &self.event_id {
666            map.serialize_entry("event_id", event_id)?;
667        }
668
669        map.end()
670    }
671}
672
673impl<'de> Deserialize<'de> for StreamEvent {
674    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
675    where
676        D: serde::Deserializer<'de>,
677    {
678        let value = serde_json::Value::deserialize(deserializer)?;
679
680        // Extract event_id first
681        let event_id = value
682            .get("event_id")
683            .and_then(|v| v.as_str())
684            .map(String::from);
685
686        // Deserialize the chunk from the same value
687        let chunk: StreamChunk = serde_json::from_value(value).map_err(serde::de::Error::custom)?;
688
689        Ok(Self { chunk, event_id })
690    }
691}
692
693/// Wrapper for SSE streaming events from the Interactions API
694///
695/// The API returns different event types during streaming:
696/// - `interaction.start`: Initial event with interaction data
697/// - `interaction.status_update`: Status changes during processing
698/// - `content.start`: Content generation begins
699/// - `content.delta`: Incremental content updates
700/// - `content.stop`: Content generation ends
701/// - `interaction.complete`: Final complete interaction
702/// - `error`: Error occurred during streaming
703#[derive(Clone, Deserialize, Debug)]
704#[serde(rename_all = "snake_case")]
705pub struct InteractionStreamEvent {
706    /// Event type (e.g., "content.delta", "interaction.complete")
707    pub event_type: String,
708
709    /// The full interaction data (present in "interaction.start" and "interaction.complete")
710    #[serde(skip_serializing_if = "Option::is_none")]
711    pub interaction: Option<InteractionResponse>,
712
713    /// Incremental content delta (present in "content.delta" events)
714    #[serde(skip_serializing_if = "Option::is_none")]
715    pub delta: Option<Content>,
716
717    /// Interaction ID (present in various events like "interaction.status_update")
718    #[serde(skip_serializing_if = "Option::is_none")]
719    pub interaction_id: Option<String>,
720
721    /// Status (present in "interaction.status_update" events)
722    #[serde(skip_serializing_if = "Option::is_none")]
723    pub status: Option<InteractionStatus>,
724
725    /// Position index for content blocks (present in "content.start" and "content.stop")
726    #[serde(skip_serializing_if = "Option::is_none")]
727    pub index: Option<usize>,
728
729    /// Content object being started (present in "content.start" events)
730    #[serde(skip_serializing_if = "Option::is_none")]
731    pub content: Option<Content>,
732
733    /// Error details (present in "error" events)
734    #[serde(skip_serializing_if = "Option::is_none")]
735    pub error: Option<StreamError>,
736
737    /// Event ID for stream resumption.
738    ///
739    /// Pass this to `last_event_id` when calling `get_interaction_stream()` to resume
740    /// the stream from this point after a network interruption.
741    #[serde(skip_serializing_if = "Option::is_none")]
742    pub event_id: Option<String>,
743}
744
745/// Error details from SSE streaming.
746///
747/// Represents error information sent in "error" type SSE events.
748#[derive(Clone, Deserialize, Debug)]
749#[serde(rename_all = "snake_case")]
750pub struct StreamError {
751    /// Human-readable error message
752    #[serde(default)]
753    pub message: String,
754
755    /// Error code from the API (if provided)
756    #[serde(skip_serializing_if = "Option::is_none")]
757    pub code: Option<String>,
758}
759
760#[cfg(test)]
761mod tests {
762    use super::*;
763
764    #[test]
765    fn test_stream_chunk_delta_roundtrip() {
766        let chunk = StreamChunk::Delta(Content::Text {
767            text: Some("Hello, world!".to_string()),
768            annotations: None,
769        });
770
771        let json = serde_json::to_string(&chunk).expect("Serialization should succeed");
772        assert!(json.contains("chunk_type"), "Should have chunk_type tag");
773        assert!(json.contains("delta"), "Should have delta variant");
774        assert!(json.contains("Hello, world!"), "Should have content");
775
776        let deserialized: StreamChunk =
777            serde_json::from_str(&json).expect("Deserialization should succeed");
778
779        match deserialized {
780            StreamChunk::Delta(content) => {
781                assert_eq!(content.as_text(), Some("Hello, world!"));
782            }
783            _ => panic!("Expected Delta variant"),
784        }
785    }
786
787    #[test]
788    fn test_stream_chunk_complete_roundtrip() {
789        let response = InteractionResponse {
790            id: Some("test-interaction-123".to_string()),
791            model: Some("gemini-3-flash-preview".to_string()),
792            agent: None,
793            input: vec![Content::Text {
794                text: Some("What is 2+2?".to_string()),
795                annotations: None,
796            }],
797            outputs: vec![Content::Text {
798                text: Some("The answer is 4.".to_string()),
799                annotations: None,
800            }],
801            status: InteractionStatus::Completed,
802            usage: None,
803            tools: None,
804            grounding_metadata: None,
805            url_context_metadata: None,
806            previous_interaction_id: None,
807            created: None,
808            updated: None,
809        };
810
811        let chunk = StreamChunk::Complete(response);
812
813        let json = serde_json::to_string(&chunk).expect("Serialization should succeed");
814        assert!(json.contains("chunk_type"), "Should have chunk_type tag");
815        assert!(json.contains("complete"), "Should have complete variant");
816        assert!(
817            json.contains("test-interaction-123"),
818            "Should have interaction id"
819        );
820        assert!(
821            json.contains("The answer is 4"),
822            "Should have response text"
823        );
824
825        let deserialized: StreamChunk =
826            serde_json::from_str(&json).expect("Deserialization should succeed");
827
828        match deserialized {
829            StreamChunk::Complete(response) => {
830                assert_eq!(response.id.as_deref(), Some("test-interaction-123"));
831                assert_eq!(response.status, InteractionStatus::Completed);
832                assert_eq!(response.as_text(), Some("The answer is 4."));
833            }
834            _ => panic!("Expected Complete variant"),
835        }
836    }
837
838    #[test]
839    fn test_stream_chunk_unknown_forward_compatibility() {
840        // Simulate a future chunk type that doesn't exist yet
841        let unknown_json = r#"{"chunk_type": "future_chunk_type", "data": {"key": "value"}}"#;
842        let deserialized: StreamChunk =
843            serde_json::from_str(unknown_json).expect("Should deserialize unknown variant");
844
845        // Verify it's an Unknown variant
846        assert!(deserialized.is_unknown());
847        assert_eq!(deserialized.unknown_chunk_type(), Some("future_chunk_type"));
848
849        // Verify data is preserved
850        let data = deserialized.unknown_data().expect("Should have data");
851        assert_eq!(data["key"], "value");
852
853        // Verify roundtrip serialization
854        let reserialized = serde_json::to_string(&deserialized).expect("Should serialize");
855        assert!(reserialized.contains("future_chunk_type"));
856        assert!(reserialized.contains("value"));
857    }
858
859    #[test]
860    fn test_stream_chunk_unknown_without_data() {
861        // Test unknown chunk type without data field
862        let unknown_json = r#"{"chunk_type": "no_data_chunk"}"#;
863        let deserialized: StreamChunk =
864            serde_json::from_str(unknown_json).expect("Should deserialize unknown variant");
865
866        assert!(deserialized.is_unknown());
867        assert_eq!(deserialized.unknown_chunk_type(), Some("no_data_chunk"));
868
869        // Data should be null when not provided
870        let data = deserialized.unknown_data().expect("Should have data field");
871        assert!(data.is_null());
872    }
873
874    #[test]
875    fn test_stream_chunk_start_roundtrip() {
876        let response = InteractionResponse {
877            id: Some("test-interaction-456".to_string()),
878            model: Some("gemini-3-flash-preview".to_string()),
879            agent: None,
880            input: vec![Content::Text {
881                text: Some("Hello".to_string()),
882                annotations: None,
883            }],
884            outputs: vec![],
885            status: InteractionStatus::InProgress,
886            usage: None,
887            tools: None,
888            grounding_metadata: None,
889            url_context_metadata: None,
890            previous_interaction_id: None,
891            created: None,
892            updated: None,
893        };
894
895        let chunk = StreamChunk::Start {
896            interaction: response,
897        };
898
899        let json = serde_json::to_string(&chunk).expect("Serialization should succeed");
900        assert!(json.contains("chunk_type"), "Should have chunk_type tag");
901        assert!(json.contains("start"), "Should have start variant");
902        assert!(
903            json.contains("test-interaction-456"),
904            "Should have interaction id"
905        );
906
907        let deserialized: StreamChunk =
908            serde_json::from_str(&json).expect("Deserialization should succeed");
909
910        match deserialized {
911            StreamChunk::Start { interaction } => {
912                assert_eq!(interaction.id.as_deref(), Some("test-interaction-456"));
913                assert_eq!(interaction.status, InteractionStatus::InProgress);
914            }
915            _ => panic!("Expected Start variant"),
916        }
917    }
918
919    #[test]
920    fn test_stream_chunk_status_update_roundtrip() {
921        let chunk = StreamChunk::StatusUpdate {
922            interaction_id: "test-interaction-789".to_string(),
923            status: InteractionStatus::RequiresAction,
924        };
925
926        let json = serde_json::to_string(&chunk).expect("Serialization should succeed");
927        assert!(json.contains("chunk_type"), "Should have chunk_type tag");
928        assert!(
929            json.contains("status_update"),
930            "Should have status_update variant"
931        );
932        assert!(
933            json.contains("test-interaction-789"),
934            "Should have interaction id"
935        );
936
937        let deserialized: StreamChunk =
938            serde_json::from_str(&json).expect("Deserialization should succeed");
939
940        match deserialized {
941            StreamChunk::StatusUpdate {
942                interaction_id,
943                status,
944            } => {
945                assert_eq!(interaction_id, "test-interaction-789");
946                assert_eq!(status, InteractionStatus::RequiresAction);
947            }
948            _ => panic!("Expected StatusUpdate variant"),
949        }
950    }
951
952    #[test]
953    fn test_stream_chunk_content_start_roundtrip() {
954        let chunk = StreamChunk::ContentStart {
955            index: 0,
956            content_type: Some("text".to_string()),
957        };
958
959        let json = serde_json::to_string(&chunk).expect("Serialization should succeed");
960        assert!(json.contains("chunk_type"), "Should have chunk_type tag");
961        assert!(
962            json.contains("content_start"),
963            "Should have content_start variant"
964        );
965        assert!(json.contains("\"index\":0"), "Should have index");
966        assert!(json.contains("text"), "Should have content_type");
967
968        let deserialized: StreamChunk =
969            serde_json::from_str(&json).expect("Deserialization should succeed");
970
971        match deserialized {
972            StreamChunk::ContentStart {
973                index,
974                content_type,
975            } => {
976                assert_eq!(index, 0);
977                assert_eq!(content_type, Some("text".to_string()));
978            }
979            _ => panic!("Expected ContentStart variant"),
980        }
981    }
982
983    #[test]
984    fn test_stream_chunk_content_stop_roundtrip() {
985        let chunk = StreamChunk::ContentStop { index: 1 };
986
987        let json = serde_json::to_string(&chunk).expect("Serialization should succeed");
988        assert!(json.contains("chunk_type"), "Should have chunk_type tag");
989        assert!(
990            json.contains("content_stop"),
991            "Should have content_stop variant"
992        );
993        assert!(json.contains("\"index\":1"), "Should have index");
994
995        let deserialized: StreamChunk =
996            serde_json::from_str(&json).expect("Deserialization should succeed");
997
998        match deserialized {
999            StreamChunk::ContentStop { index } => {
1000                assert_eq!(index, 1);
1001            }
1002            _ => panic!("Expected ContentStop variant"),
1003        }
1004    }
1005
1006    #[test]
1007    fn test_stream_chunk_error_roundtrip() {
1008        let chunk = StreamChunk::Error {
1009            message: "Rate limit exceeded".to_string(),
1010            code: Some("RATE_LIMIT".to_string()),
1011        };
1012
1013        let json = serde_json::to_string(&chunk).expect("Serialization should succeed");
1014        assert!(json.contains("chunk_type"), "Should have chunk_type tag");
1015        assert!(json.contains("error"), "Should have error variant");
1016        assert!(json.contains("Rate limit exceeded"), "Should have message");
1017        assert!(json.contains("RATE_LIMIT"), "Should have code");
1018
1019        let deserialized: StreamChunk =
1020            serde_json::from_str(&json).expect("Deserialization should succeed");
1021
1022        match deserialized {
1023            StreamChunk::Error { message, code } => {
1024                assert_eq!(message, "Rate limit exceeded");
1025                assert_eq!(code, Some("RATE_LIMIT".to_string()));
1026            }
1027            _ => panic!("Expected Error variant"),
1028        }
1029    }
1030
1031    #[test]
1032    fn test_stream_chunk_error_without_code() {
1033        let chunk = StreamChunk::Error {
1034            message: "Unknown error".to_string(),
1035            code: None,
1036        };
1037
1038        let json = serde_json::to_string(&chunk).expect("Serialization should succeed");
1039        let deserialized: StreamChunk =
1040            serde_json::from_str(&json).expect("Deserialization should succeed");
1041
1042        match deserialized {
1043            StreamChunk::Error { message, code } => {
1044                assert_eq!(message, "Unknown error");
1045                assert!(code.is_none());
1046            }
1047            _ => panic!("Expected Error variant"),
1048        }
1049    }
1050
1051    #[test]
1052    fn test_stream_chunk_helper_methods() {
1053        // Test interaction_id()
1054        let start_chunk = StreamChunk::Start {
1055            interaction: InteractionResponse {
1056                id: Some("start-id".to_string()),
1057                model: None,
1058                agent: None,
1059                input: vec![],
1060                outputs: vec![],
1061                status: InteractionStatus::InProgress,
1062                usage: None,
1063                tools: None,
1064                grounding_metadata: None,
1065                url_context_metadata: None,
1066                previous_interaction_id: None,
1067                created: None,
1068                updated: None,
1069            },
1070        };
1071        assert_eq!(start_chunk.interaction_id(), Some("start-id"));
1072
1073        let status_chunk = StreamChunk::StatusUpdate {
1074            interaction_id: "status-id".to_string(),
1075            status: InteractionStatus::InProgress,
1076        };
1077        assert_eq!(status_chunk.interaction_id(), Some("status-id"));
1078
1079        let delta_chunk = StreamChunk::Delta(Content::Text {
1080            text: Some("test".to_string()),
1081            annotations: None,
1082        });
1083        assert_eq!(delta_chunk.interaction_id(), None);
1084
1085        // Test is_terminal()
1086        let complete_chunk = StreamChunk::Complete(InteractionResponse {
1087            id: None,
1088            model: None,
1089            agent: None,
1090            input: vec![],
1091            outputs: vec![],
1092            status: InteractionStatus::Completed,
1093            usage: None,
1094            tools: None,
1095            grounding_metadata: None,
1096            url_context_metadata: None,
1097            previous_interaction_id: None,
1098            created: None,
1099            updated: None,
1100        });
1101        assert!(complete_chunk.is_terminal());
1102
1103        let error_chunk = StreamChunk::Error {
1104            message: "test".to_string(),
1105            code: None,
1106        };
1107        assert!(error_chunk.is_terminal());
1108
1109        assert!(!delta_chunk.is_terminal());
1110        assert!(!start_chunk.is_terminal());
1111
1112        // Test status()
1113        assert_eq!(status_chunk.status(), Some(&InteractionStatus::InProgress));
1114        assert_eq!(complete_chunk.status(), Some(&InteractionStatus::Completed));
1115        assert_eq!(delta_chunk.status(), None);
1116    }
1117
1118    #[test]
1119    fn test_stream_event_with_event_id_roundtrip() {
1120        let event = StreamEvent::new(
1121            StreamChunk::Delta(Content::Text {
1122                text: Some("Hello".to_string()),
1123                annotations: None,
1124            }),
1125            Some("evt_abc123".to_string()),
1126        );
1127
1128        // Test helper methods
1129        assert!(event.is_delta());
1130        assert!(!event.is_complete());
1131        assert!(!event.is_unknown());
1132
1133        let json = serde_json::to_string(&event).expect("Serialization should succeed");
1134        assert!(json.contains("evt_abc123"), "Should have event_id");
1135        assert!(json.contains("Hello"), "Should have content");
1136
1137        let deserialized: StreamEvent =
1138            serde_json::from_str(&json).expect("Deserialization should succeed");
1139        assert_eq!(deserialized.event_id.as_deref(), Some("evt_abc123"));
1140        assert!(deserialized.is_delta());
1141    }
1142
1143    #[test]
1144    fn test_stream_event_without_event_id() {
1145        let event = StreamEvent::new(
1146            StreamChunk::Complete(InteractionResponse {
1147                id: Some("interaction-123".to_string()),
1148                model: Some("gemini-3-flash-preview".to_string()),
1149                agent: None,
1150                input: vec![],
1151                outputs: vec![Content::Text {
1152                    text: Some("Response".to_string()),
1153                    annotations: None,
1154                }],
1155                status: InteractionStatus::Completed,
1156                usage: None,
1157                tools: None,
1158                grounding_metadata: None,
1159                url_context_metadata: None,
1160                previous_interaction_id: None,
1161                created: None,
1162                updated: None,
1163            }),
1164            None,
1165        );
1166
1167        assert!(event.is_complete());
1168        assert!(!event.is_delta());
1169        assert!(event.event_id.is_none());
1170
1171        let json = serde_json::to_string(&event).expect("Serialization should succeed");
1172        assert!(!json.contains("event_id"), "Should not have event_id field");
1173
1174        let deserialized: StreamEvent =
1175            serde_json::from_str(&json).expect("Deserialization should succeed");
1176        assert!(deserialized.event_id.is_none());
1177        assert!(deserialized.is_complete());
1178    }
1179
1180    #[test]
1181    fn test_interaction_stream_event_with_event_id() {
1182        let json = r#"{
1183            "event_type": "content.delta",
1184            "delta": {"type": "text", "text": "Hello"},
1185            "event_id": "evt_resume_token_123"
1186        }"#;
1187
1188        let event: InteractionStreamEvent = serde_json::from_str(json).expect("Should deserialize");
1189        assert_eq!(event.event_type, "content.delta");
1190        assert_eq!(event.event_id.as_deref(), Some("evt_resume_token_123"));
1191        assert!(event.delta.is_some());
1192    }
1193}