dynamo_llm/protocols/
codec.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3
4//! A module for parsing Server-Sent Events (SSE) streams according to the SSE specification.
5//!
6//! This module provides `SseLineCodec<T>`, a codec for decoding SSE streams into typed messages.
7//! It handles parsing of `id`, `event`, `data`, and comments, and attempts to deserialize
8//! the `data` field into the specified type `T`.
9//!
10
11// TODO: Determine if we should use an External EventSource crate. There appear to be several
12// potential candidates.
13
14use std::{io::Cursor, pin::Pin};
15
16use bytes::BytesMut;
17use futures::Stream;
18use serde::Deserialize;
19use tokio_util::codec::{Decoder, FramedRead, LinesCodec};
20
21use super::Annotated;
22
23/// An error that occurs when decoding an SSE stream.
24#[derive(Debug, thiserror::Error)]
25pub enum SseCodecError {
26    #[error("SseLineCodec decode error: {0}")]
27    DecodeError(String),
28
29    #[error("IO error: {0}")]
30    IoError(#[from] std::io::Error),
31}
32
33/// A codec for decoding SSE streams into `Message<T>` instances.
34///
35/// This codec parses SSE streams according to the SSE specification and attempts to deserialize
36/// the `data` field into the specified type `T`.
37///
38/// # Type Parameters
39///
40/// * `T` - The type to deserialize the `data` field into.
41pub struct SseLineCodec {
42    lines_codec: LinesCodec,
43    data_buffer: String,
44    event_type_buffer: String,
45    last_event_id_buffer: String,
46    comments_buffer: Vec<String>,
47}
48
49/// Represents a parsed SSE message.
50///
51/// The `Message` struct contains optional fields for `id`, `event`, `data`, and a vector of `comments`.
52///
53/// # Type Parameters
54///
55/// * `T` - The type to deserialize the `data` field into.
56#[derive(Debug)]
57pub struct Message {
58    pub id: Option<String>,
59    pub event: Option<String>,
60    pub data: Option<String>,
61    pub comments: Option<Vec<String>>,
62}
63
64impl Message {
65    /// Deserializes the `data` field into the specified type `T`.
66    ///
67    /// # Errors
68    ///
69    /// Returns an error if the `data` field is empty or if deserialization fails.
70    pub fn decode_data<T>(&self) -> Result<T, SseCodecError>
71    where
72        T: for<'de> Deserialize<'de>,
73    {
74        serde_json::from_str(self.data.as_ref().ok_or(SseCodecError::DecodeError(
75            "no data: message to decode".to_string(),
76        ))?)
77        .map_err(|e| SseCodecError::DecodeError(format!("failed to deserialized data: {}", e)))
78    }
79}
80
81impl<T> TryFrom<Message> for Annotated<T>
82where
83    T: for<'de> Deserialize<'de>,
84{
85    type Error = String;
86
87    fn try_from(value: Message) -> Result<Annotated<T>, Self::Error> {
88        // determine if the message had an error
89        if let Some(event) = value.event.as_ref()
90            && event == "error"
91        {
92            let message = match &value.comments {
93                Some(comments) => comments.join("\n"),
94                None => "`event: error` detected, but no error message found".to_string(),
95            };
96            return Err(message);
97        }
98
99        // try to deserialize the data to T
100
101        let data: Option<T> = match &value.data {
102            Some(_) => value.decode_data().map_err(|e| e.to_string())?,
103            None => None,
104        };
105
106        Ok(Annotated {
107            data,
108            id: value.id,
109            event: value.event,
110            comment: value.comments,
111        })
112    }
113}
114
115impl SseLineCodec {
116    /// Creates a new `SseLineCodec<T>`.
117    pub fn new() -> Self {
118        Self::default()
119    }
120}
121
122impl Default for SseLineCodec {
123    fn default() -> Self {
124        Self {
125            lines_codec: LinesCodec::new(),
126            data_buffer: String::new(),
127            event_type_buffer: String::new(),
128            last_event_id_buffer: String::new(),
129            comments_buffer: Vec::new(),
130        }
131    }
132}
133
134impl Decoder for SseLineCodec {
135    type Item = Message;
136    type Error = SseCodecError;
137
138    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
139        loop {
140            match self
141                .lines_codec
142                .decode(src)
143                .map_err(|e| SseCodecError::DecodeError(e.to_string()))?
144            {
145                Some(line) => {
146                    let line = line.trim_end_matches(&['\r', '\n'][..]);
147                    if line.is_empty() {
148                        // End of event; dispatch
149                        if !self.data_buffer.is_empty()
150                            || !self.event_type_buffer.is_empty()
151                            || !self.last_event_id_buffer.is_empty()
152                            || !self.comments_buffer.is_empty()
153                        {
154                            // Remove the last '\n' if present in data_buffer
155                            if self.data_buffer.ends_with('\n') {
156                                self.data_buffer.pop();
157                            }
158
159                            let data = if !self.data_buffer.is_empty() {
160                                Some(std::mem::take(&mut self.data_buffer))
161                            } else {
162                                None
163                            };
164
165                            let message = Message {
166                                id: if self.last_event_id_buffer.is_empty() {
167                                    None
168                                } else {
169                                    Some(std::mem::take(&mut self.last_event_id_buffer))
170                                },
171                                event: if self.event_type_buffer.is_empty() {
172                                    None
173                                } else {
174                                    Some(std::mem::take(&mut self.event_type_buffer))
175                                },
176                                data,
177                                comments: if self.comments_buffer.is_empty() {
178                                    None
179                                } else {
180                                    Some(std::mem::take(&mut self.comments_buffer))
181                                },
182                            };
183                            // No need to clear the buffers; they've been replaced with empty values
184                            return Ok(Some(message));
185                        } else {
186                            // No data to dispatch; continue
187                            continue;
188                        }
189                    } else if let Some(comment) = line.strip_prefix(':') {
190                        self.comments_buffer.push(comment.trim().into());
191                    } else {
192                        let (field_name, field_value) = if let Some(idx) = line.find(':') {
193                            let (name, value) = line.split_at(idx);
194                            let value = value[1..].trim_start_matches(' ');
195                            (name, value)
196                        } else {
197                            (line, "")
198                        };
199
200                        match field_name {
201                            "event" => {
202                                self.event_type_buffer = field_value.to_string();
203                            }
204                            "data" => {
205                                if field_value != "[DONE]" {
206                                    if !self.data_buffer.is_empty() {
207                                        self.data_buffer.push('\n');
208                                    }
209                                    self.data_buffer.push_str(field_value);
210                                }
211                            }
212                            "id" => {
213                                if !field_value.contains('\0') {
214                                    self.last_event_id_buffer = field_value.to_string();
215                                }
216                            }
217                            "retry" => {
218                                // For simplicity, we'll ignore retry in this implementation
219                            }
220                            _ => {
221                                // Ignore unknown fields
222                            }
223                        }
224                    }
225                }
226                None => {
227                    // No more data available at the moment
228                    return Ok(None);
229                }
230            }
231        }
232    }
233
234    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
235        // Attempt to process any remaining data
236        let result = self.decode(src)?;
237        if result.is_some() {
238            return Ok(result);
239        }
240        // If there's no data left to process, return None
241        if self.data_buffer.is_empty()
242            && self.event_type_buffer.is_empty()
243            && self.last_event_id_buffer.is_empty()
244            && self.comments_buffer.is_empty()
245        {
246            Ok(None)
247        } else {
248            // Dispatch any remaining data as an event
249            if self.data_buffer.ends_with('\n') {
250                self.data_buffer.pop();
251            }
252
253            let data = if !self.data_buffer.is_empty() {
254                Some(std::mem::take(&mut self.data_buffer))
255            } else {
256                None
257            };
258
259            let message = Message {
260                id: if self.last_event_id_buffer.is_empty() {
261                    None
262                } else {
263                    Some(std::mem::take(&mut self.last_event_id_buffer))
264                },
265                event: if self.event_type_buffer.is_empty() {
266                    None
267                } else {
268                    Some(std::mem::take(&mut self.event_type_buffer))
269                },
270                data,
271                comments: if self.comments_buffer.is_empty() {
272                    None
273                } else {
274                    Some(std::mem::take(&mut self.comments_buffer))
275                },
276            };
277            // No need to clear the buffers; they've been replaced with empty values
278            Ok(Some(message))
279        }
280    }
281}
282
283/// Creates a stream of `Message` instances from a text stream of SSE events.
284pub fn create_message_stream(
285    text: &str,
286) -> Pin<Box<dyn Stream<Item = Result<Message, SseCodecError>> + Send + Sync>> {
287    let cursor = Cursor::new(text.to_string());
288    let framed = FramedRead::new(cursor, SseLineCodec::new());
289    Box::pin(framed)
290}
291
292#[cfg(test)]
293mod tests {
294    use std::io::Cursor;
295
296    use futures::stream::StreamExt;
297    use tokio_util::codec::FramedRead;
298
299    use super::*;
300
301    #[derive(Deserialize, Debug, PartialEq)]
302    struct TestData {
303        message: String,
304    }
305
306    #[tokio::test]
307    async fn test_message_with_all_fields() {
308        let sample_data = r#"id: 123
309event: test
310data: {"message": "Hello World"}
311: This is a comment
312
313"#;
314        let cursor = Cursor::new(sample_data);
315        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
316
317        if let Some(Ok(message)) = framed.next().await {
318            assert_eq!(message.id, Some("123".to_string()));
319            assert_eq!(message.event, Some("test".to_string()));
320            assert_eq!(
321                message.comments,
322                Some(vec!["This is a comment".to_string()])
323            );
324            let data: TestData = message.decode_data().unwrap();
325            assert_eq!(data.message, "Hello World".to_string());
326        } else {
327            panic!("Expected a message");
328        }
329    }
330
331    #[tokio::test]
332    async fn test_message_with_only_data() {
333        let sample_data = r#"data: {"message": "Just some data"}
334
335"#;
336        let cursor = Cursor::new(sample_data);
337        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
338
339        if let Some(Ok(message)) = framed.next().await {
340            assert!(message.id.is_none());
341            assert!(message.event.is_none());
342            assert!(message.comments.is_none());
343            let data: TestData = message.decode_data().unwrap();
344            assert_eq!(data.message, "Just some data".to_string());
345        } else {
346            panic!("Expected a message");
347        }
348    }
349
350    #[tokio::test]
351    async fn test_message_with_only_comment() {
352        let sample_data = r#": This is a comment
353
354"#;
355        let cursor = Cursor::new(sample_data);
356        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
357
358        if let Some(Ok(message)) = framed.next().await {
359            assert!(message.id.is_none());
360            assert!(message.event.is_none());
361            assert!(message.data.is_none());
362            assert_eq!(
363                message.comments,
364                Some(vec!["This is a comment".to_string()])
365            );
366        } else {
367            panic!("Expected a message");
368        }
369    }
370
371    #[tokio::test]
372    async fn test_message_with_multiple_comments() {
373        let sample_data = r#": First comment
374: Second comment
375
376"#;
377        let cursor = Cursor::new(sample_data);
378        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
379
380        if let Some(Ok(message)) = framed.next().await {
381            assert!(message.id.is_none());
382            assert!(message.event.is_none());
383            assert!(message.data.is_none());
384            assert_eq!(
385                message.comments,
386                Some(vec![
387                    "First comment".to_string(),
388                    "Second comment".to_string()
389                ])
390            );
391        } else {
392            panic!("Expected a message");
393        }
394    }
395
396    #[tokio::test]
397    async fn test_message_with_partial_fields() {
398        let sample_data = r#"id: 456
399data: {"message": "Partial data"}
400
401"#;
402        let cursor = Cursor::new(sample_data);
403        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
404
405        if let Some(Ok(message)) = framed.next().await {
406            assert_eq!(message.id, Some("456".to_string()));
407            assert!(message.event.is_none());
408            assert!(message.comments.is_none());
409            let data: TestData = message.decode_data().unwrap();
410            assert_eq!(data.message, "Partial data".to_string());
411        } else {
412            panic!("Expected a message");
413        }
414    }
415
416    #[tokio::test]
417    async fn test_message_with_invalid_json_data() {
418        let sample_data = r#"data: {"message": "Invalid JSON
419
420"#;
421        let cursor = Cursor::new(sample_data);
422        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
423
424        if let Some(result) = framed.next().await {
425            match result {
426                Ok(message) => {
427                    // got a message, but it has invalid json
428                    let data = message.decode_data::<TestData>();
429                    assert!(data.is_err(), "Expected an error; got {:?}", data);
430                }
431                _ => panic!("Expected a message"),
432            }
433        } else {
434            panic!("Expected an error");
435        }
436    }
437
438    #[tokio::test]
439    async fn test_message_with_missing_data_field() {
440        let sample_data = r#"id: 789
441event: test_event
442
443"#;
444        let cursor = Cursor::new(sample_data);
445        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
446
447        if let Some(Ok(message)) = framed.next().await {
448            assert_eq!(message.id, Some("789".to_string()));
449            assert_eq!(message.event, Some("test_event".to_string()));
450            assert!(message.data.is_none());
451            assert!(message.comments.is_none());
452        } else {
453            panic!("Expected a message");
454        }
455    }
456
457    #[tokio::test]
458    async fn test_message_with_empty_data_field() {
459        let sample_data = r#"data:
460
461"#;
462        let cursor = Cursor::new(sample_data);
463        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
464
465        if let Some(result) = framed.next().await {
466            match result {
467                Ok(_) => {
468                    panic!("Expected no message");
469                }
470                Err(e) => panic!("Unexpected error: {}", e),
471            }
472        } else {
473            // no message is emitted
474        }
475    }
476
477    #[tokio::test]
478    async fn test_message_with_multiple_data_lines() {
479        let sample_data = r#"data: {"message": "Line1"}
480data: {"message": "Line2"}
481
482"#;
483        let cursor = Cursor::new(sample_data);
484        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
485
486        if let Some(result) = framed.next().await {
487            match result {
488                Ok(message) => {
489                    // got a message with data, but the data is junk
490                    let data = message.decode_data::<TestData>();
491                    assert!(data.is_err(), "Expected an error; got {:?}", data);
492                }
493                _ => panic!("Expected a message"),
494            }
495        } else {
496            panic!("Expected an error");
497        }
498    }
499
500    #[tokio::test]
501    async fn test_message_with_unrecognized_field() {
502        let sample_data = r#"unknown: value
503data: {"message": "Hello"}
504
505"#;
506        let cursor = Cursor::new(sample_data);
507        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
508
509        if let Some(Ok(message)) = framed.next().await {
510            // Unrecognized fields are ignored
511            assert!(message.id.is_none());
512            assert!(message.event.is_none());
513            assert!(message.comments.is_none());
514            let data: TestData = message.decode_data().unwrap();
515            assert_eq!(data.message, "Hello".to_string());
516        } else {
517            panic!("Expected a message");
518        }
519    }
520
521    // data recorded on 2024-09-30 from
522    // + curl https://integrate.api.nvidia.com/v1/chat/completions -H 'Content-Type: application/json' \
523    //     -H 'Authorization: Bearer nvapi-<redacted>' -d '{
524    //     "model": "mistralai/mixtral-8x22b-instruct-v0.1",
525    //     "messages": [{"role":"user","content":"Write a limerick about the wonders of GPU computing."}],
526    //     "temperature": 0.5,
527    //     "top_p": 1,
528    //     "max_tokens": 64,
529    //     "stream": true
530    //   }'
531    const SAMPLE_CHAT_DATA: &str = r#"
532data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":"assistant","content":null},"logprobs":null,"finish_reason":null}]}
533
534data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"A"},"logprobs":null,"finish_reason":null}]}
535
536data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" GPU"},"logprobs":null,"finish_reason":null}]}
537
538data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" so"},"logprobs":null,"finish_reason":null}]}
539
540data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" swift"},"logprobs":null,"finish_reason":null}]}
541
542data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" and"},"logprobs":null,"finish_reason":null}]}
543
544data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" so"},"logprobs":null,"finish_reason":null}]}
545
546data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" clever"},"logprobs":null,"finish_reason":null}]}
547
548data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
549
550data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
551
552data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"In"},"logprobs":null,"finish_reason":null}]}
553
554data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" comput"},"logprobs":null,"finish_reason":null}]}
555
556data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"ations"},"logprobs":null,"finish_reason":null}]}
557
558data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" it"},"logprobs":null,"finish_reason":null}]}
559
560data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"'"},"logprobs":null,"finish_reason":null}]}
561
562data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"s"},"logprobs":null,"finish_reason":null}]}
563
564data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" quite"},"logprobs":null,"finish_reason":null}]}
565
566data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" the"},"logprobs":null,"finish_reason":null}]}
567
568data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" ende"},"logprobs":null,"finish_reason":null}]}
569
570data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"avor"},"logprobs":null,"finish_reason":null}]}
571
572data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"."},"logprobs":null,"finish_reason":null}]}
573
574data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
575
576data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"With"},"logprobs":null,"finish_reason":null}]}
577
578data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" its"},"logprobs":null,"finish_reason":null}]}
579
580data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" thousands"},"logprobs":null,"finish_reason":null}]}
581
582data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" of"},"logprobs":null,"finish_reason":null}]}
583
584data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" co"},"logprobs":null,"finish_reason":null}]}
585
586data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"res"},"logprobs":null,"finish_reason":null}]}
587
588data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
589
590data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
591
592data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"On"},"logprobs":null,"finish_reason":null}]}
593
594data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" complex"},"logprobs":null,"finish_reason":null}]}
595
596data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" tasks"},"logprobs":null,"finish_reason":null}]}
597
598data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" it"},"logprobs":null,"finish_reason":null}]}
599
600data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" ro"},"logprobs":null,"finish_reason":null}]}
601
602data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"ars"},"logprobs":null,"finish_reason":null}]}
603
604data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
605
606data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
607
608data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"S"},"logprobs":null,"finish_reason":null}]}
609
610data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"olving"},"logprobs":null,"finish_reason":null}]}
611
612data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" problems"},"logprobs":null,"finish_reason":null}]}
613
614data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" like"},"logprobs":null,"finish_reason":null}]}
615
616data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" never"},"logprobs":null,"finish_reason":null}]}
617
618data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
619
620data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" forever"},"logprobs":null,"finish_reason":null}]}
621
622data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"!"},"logprobs":null,"finish_reason":null}]}
623
624data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":""},"logprobs":null,"finish_reason":"stop","stop_reason":null}]}
625
626data: [DONE]
627
628"#;
629
630    #[tokio::test]
631    async fn test_openai_chat_stream() {
632        use crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse;
633
634        // let cursor = Cursor::new(SAMPLE_CHAT_DATA);
635        // let mut framed = FramedRead::new(cursor, SseLineCodec::new());
636
637        let mut stream = create_message_stream(SAMPLE_CHAT_DATA);
638
639        let mut counter = 0;
640
641        loop {
642            match stream.next().await {
643                Some(Ok(message)) => {
644                    let delta: NvCreateChatCompletionStreamResponse =
645                        serde_json::from_str(&message.data.unwrap()).unwrap();
646                    counter += 1;
647                    println!("counter: {}", counter);
648                    println!("delta: {:?}", delta);
649                }
650                Some(Err(e)) => {
651                    panic!("Error: {:?}", e);
652                }
653                None => {
654                    break;
655                }
656            }
657        }
658
659        assert_eq!(counter, 47);
660    }
661
662    #[test]
663    fn test_successful_conversion() {
664        let message = Message {
665            id: Some("123".to_string()),
666            event: Some("update".to_string()),
667            data: Some(r#"{"message": "Hello World"}"#.to_string()),
668            comments: Some(vec!["Some comment".to_string()]),
669        };
670
671        let annotated: Annotated<TestData> = message.try_into().unwrap();
672
673        assert_eq!(annotated.id, Some("123".to_string()));
674        assert_eq!(annotated.event, Some("update".to_string()));
675        assert_eq!(annotated.comment, Some(vec!["Some comment".to_string()]));
676        assert_eq!(
677            annotated.data,
678            Some(TestData {
679                message: "Hello World".to_string()
680            })
681        );
682    }
683
684    #[test]
685    fn test_error_event_with_comments() {
686        let message = Message {
687            id: Some("456".to_string()),
688            event: Some("error".to_string()),
689            data: Some("Error data".to_string()),
690            comments: Some(vec!["An error occurred".to_string()]),
691        };
692
693        let result: Result<Annotated<TestData>, _> = message.try_into();
694
695        assert!(result.is_err());
696        assert_eq!(result.unwrap_err(), "An error occurred".to_string());
697    }
698
699    #[test]
700    fn test_error_event_without_comments() {
701        let message = Message {
702            id: Some("789".to_string()),
703            event: Some("error".to_string()),
704            data: Some("Error data".to_string()),
705            comments: None,
706        };
707
708        let result: Result<Annotated<TestData>, _> = message.try_into();
709
710        assert!(result.is_err());
711    }
712
713    #[test]
714    fn test_invalid_json_data() {
715        let message = Message {
716            id: None,
717            event: Some("update".to_string()),
718            data: Some("Invalid JSON".to_string()),
719            comments: None,
720        };
721
722        let result: Result<Annotated<TestData>, _> = message.try_into();
723
724        assert!(result.is_err());
725    }
726
727    #[test]
728    fn test_missing_data_field() {
729        let message = Message {
730            id: None,
731            event: Some("update".to_string()),
732            data: None,
733            comments: None,
734        };
735
736        let result: Result<Annotated<TestData>, _> = message.try_into();
737
738        assert!(result.is_ok());
739        let annotated = result.unwrap();
740        assert!(annotated.data.is_none());
741        assert_eq!(annotated.event, Some("update".to_string()));
742    }
743}