dynamo_llm/protocols/
codec.rs

1// SPDX-FileCopyrightText: Copyright (c) 2024-2025 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
2// SPDX-License-Identifier: Apache-2.0
3//
4// Licensed under the Apache License, Version 2.0 (the "License");
5// you may not use this file except in compliance with the License.
6// You may obtain a copy of the License at
7//
8// http://www.apache.org/licenses/LICENSE-2.0
9//
10// Unless required by applicable law or agreed to in writing, software
11// distributed under the License is distributed on an "AS IS" BASIS,
12// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13// See the License for the specific language governing permissions and
14// limitations under the License.
15
16//! A module for parsing Server-Sent Events (SSE) streams according to the SSE specification.
17//!
18//! This module provides `SseLineCodec<T>`, a codec for decoding SSE streams into typed messages.
19//! It handles parsing of `id`, `event`, `data`, and comments, and attempts to deserialize
20//! the `data` field into the specified type `T`.
21//!
22
23// TODO: Determine if we should use an External EventSource crate. There appear to be several
24// potential candidates.
25
26use bytes::BytesMut;
27use futures::Stream;
28use serde::Deserialize;
29use std::{io::Cursor, pin::Pin};
30use tokio_util::codec::{Decoder, FramedRead, LinesCodec};
31
32use super::Annotated;
33
34/// An error that occurs when decoding an SSE stream.
35#[derive(Debug, thiserror::Error)]
36pub enum SseCodecError {
37    #[error("SseLineCodec decode error: {0}")]
38    DecodeError(String),
39
40    #[error("IO error: {0}")]
41    IoError(#[from] std::io::Error),
42}
43
44/// A codec for decoding SSE streams into `Message<T>` instances.
45///
46/// This codec parses SSE streams according to the SSE specification and attempts to deserialize
47/// the `data` field into the specified type `T`.
48///
49/// # Type Parameters
50///
51/// * `T` - The type to deserialize the `data` field into.
52pub struct SseLineCodec {
53    lines_codec: LinesCodec,
54    data_buffer: String,
55    event_type_buffer: String,
56    last_event_id_buffer: String,
57    comments_buffer: Vec<String>,
58}
59
60/// Represents a parsed SSE message.
61///
62/// The `Message` struct contains optional fields for `id`, `event`, `data`, and a vector of `comments`.
63///
64/// # Type Parameters
65///
66/// * `T` - The type to deserialize the `data` field into.
67#[derive(Debug)]
68pub struct Message {
69    pub id: Option<String>,
70    pub event: Option<String>,
71    pub data: Option<String>,
72    pub comments: Option<Vec<String>>,
73}
74
75impl Message {
76    /// Deserializes the `data` field into the specified type `T`.
77    ///
78    /// # Errors
79    ///
80    /// Returns an error if the `data` field is empty or if deserialization fails.
81    pub fn decode_data<T>(&self) -> Result<T, SseCodecError>
82    where
83        T: for<'de> Deserialize<'de>,
84    {
85        serde_json::from_str(self.data.as_ref().ok_or(SseCodecError::DecodeError(
86            "no data: message to decode".to_string(),
87        ))?)
88        .map_err(|e| SseCodecError::DecodeError(format!("failed to deserialized data: {}", e)))
89    }
90}
91
92impl<T> TryFrom<Message> for Annotated<T>
93where
94    T: for<'de> Deserialize<'de>,
95{
96    type Error = String;
97
98    fn try_from(value: Message) -> Result<Annotated<T>, Self::Error> {
99        // determine if the message had an error
100        if let Some(event) = value.event.as_ref() {
101            if event == "error" {
102                let message = match &value.comments {
103                    Some(comments) => comments.join("\n"),
104                    None => "`event: error` detected, but no error message found".to_string(),
105                };
106                return Err(message);
107            }
108        }
109
110        // try to deserialize the data to T
111
112        let data: Option<T> = match &value.data {
113            Some(_) => value.decode_data().map_err(|e| e.to_string())?,
114            None => None,
115        };
116
117        Ok(Annotated {
118            data,
119            id: value.id,
120            event: value.event,
121            comment: value.comments,
122        })
123    }
124}
125
126impl SseLineCodec {
127    /// Creates a new `SseLineCodec<T>`.
128    pub fn new() -> Self {
129        Self::default()
130    }
131}
132
133impl Default for SseLineCodec {
134    fn default() -> Self {
135        Self {
136            lines_codec: LinesCodec::new(),
137            data_buffer: String::new(),
138            event_type_buffer: String::new(),
139            last_event_id_buffer: String::new(),
140            comments_buffer: Vec::new(),
141        }
142    }
143}
144
145impl Decoder for SseLineCodec {
146    type Item = Message;
147    type Error = SseCodecError;
148
149    fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
150        loop {
151            match self
152                .lines_codec
153                .decode(src)
154                .map_err(|e| SseCodecError::DecodeError(e.to_string()))?
155            {
156                Some(line) => {
157                    let line = line.trim_end_matches(&['\r', '\n'][..]);
158                    if line.is_empty() {
159                        // End of event; dispatch
160                        if !self.data_buffer.is_empty()
161                            || !self.event_type_buffer.is_empty()
162                            || !self.last_event_id_buffer.is_empty()
163                            || !self.comments_buffer.is_empty()
164                        {
165                            // Remove the last '\n' if present in data_buffer
166                            if self.data_buffer.ends_with('\n') {
167                                self.data_buffer.pop();
168                            }
169
170                            let data = if !self.data_buffer.is_empty() {
171                                Some(std::mem::take(&mut self.data_buffer))
172                            } else {
173                                None
174                            };
175
176                            let message = Message {
177                                id: if self.last_event_id_buffer.is_empty() {
178                                    None
179                                } else {
180                                    Some(std::mem::take(&mut self.last_event_id_buffer))
181                                },
182                                event: if self.event_type_buffer.is_empty() {
183                                    None
184                                } else {
185                                    Some(std::mem::take(&mut self.event_type_buffer))
186                                },
187                                data,
188                                comments: if self.comments_buffer.is_empty() {
189                                    None
190                                } else {
191                                    Some(std::mem::take(&mut self.comments_buffer))
192                                },
193                            };
194                            // No need to clear the buffers; they've been replaced with empty values
195                            return Ok(Some(message));
196                        } else {
197                            // No data to dispatch; continue
198                            continue;
199                        }
200                    } else if let Some(comment) = line.strip_prefix(':') {
201                        self.comments_buffer.push(comment.trim().into());
202                    } else {
203                        let (field_name, field_value) = if let Some(idx) = line.find(':') {
204                            let (name, value) = line.split_at(idx);
205                            let value = value[1..].trim_start_matches(' ');
206                            (name, value)
207                        } else {
208                            (line, "")
209                        };
210
211                        match field_name {
212                            "event" => {
213                                self.event_type_buffer = field_value.to_string();
214                            }
215                            "data" => {
216                                if field_value != "[DONE]" {
217                                    if !self.data_buffer.is_empty() {
218                                        self.data_buffer.push('\n');
219                                    }
220                                    self.data_buffer.push_str(field_value);
221                                }
222                            }
223                            "id" => {
224                                if !field_value.contains('\0') {
225                                    self.last_event_id_buffer = field_value.to_string();
226                                }
227                            }
228                            "retry" => {
229                                // For simplicity, we'll ignore retry in this implementation
230                            }
231                            _ => {
232                                // Ignore unknown fields
233                            }
234                        }
235                    }
236                }
237                None => {
238                    // No more data available at the moment
239                    return Ok(None);
240                }
241            }
242        }
243    }
244
245    fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
246        // Attempt to process any remaining data
247        let result = self.decode(src)?;
248        if result.is_some() {
249            return Ok(result);
250        }
251        // If there's no data left to process, return None
252        if self.data_buffer.is_empty()
253            && self.event_type_buffer.is_empty()
254            && self.last_event_id_buffer.is_empty()
255            && self.comments_buffer.is_empty()
256        {
257            Ok(None)
258        } else {
259            // Dispatch any remaining data as an event
260            if self.data_buffer.ends_with('\n') {
261                self.data_buffer.pop();
262            }
263
264            let data = if !self.data_buffer.is_empty() {
265                Some(std::mem::take(&mut self.data_buffer))
266            } else {
267                None
268            };
269
270            let message = Message {
271                id: if self.last_event_id_buffer.is_empty() {
272                    None
273                } else {
274                    Some(std::mem::take(&mut self.last_event_id_buffer))
275                },
276                event: if self.event_type_buffer.is_empty() {
277                    None
278                } else {
279                    Some(std::mem::take(&mut self.event_type_buffer))
280                },
281                data,
282                comments: if self.comments_buffer.is_empty() {
283                    None
284                } else {
285                    Some(std::mem::take(&mut self.comments_buffer))
286                },
287            };
288            // No need to clear the buffers; they've been replaced with empty values
289            Ok(Some(message))
290        }
291    }
292}
293
294/// Creates a stream of `Message` instances from a text stream of SSE events.
295pub fn create_message_stream(
296    text: &str,
297) -> Pin<Box<dyn Stream<Item = Result<Message, SseCodecError>> + Send + Sync>> {
298    let cursor = Cursor::new(text.to_string());
299    let framed = FramedRead::new(cursor, SseLineCodec::new());
300    Box::pin(framed)
301}
302
303#[cfg(test)]
304mod tests {
305    use std::io::Cursor;
306
307    use futures::stream::StreamExt;
308    use tokio_util::codec::FramedRead;
309
310    use super::*;
311
312    #[derive(Deserialize, Debug, PartialEq)]
313    struct TestData {
314        message: String,
315    }
316
317    #[tokio::test]
318    async fn test_message_with_all_fields() {
319        let sample_data = r#"id: 123
320event: test
321data: {"message": "Hello World"}
322: This is a comment
323
324"#;
325        let cursor = Cursor::new(sample_data);
326        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
327
328        if let Some(Ok(message)) = framed.next().await {
329            assert_eq!(message.id, Some("123".to_string()));
330            assert_eq!(message.event, Some("test".to_string()));
331            assert_eq!(
332                message.comments,
333                Some(vec!["This is a comment".to_string()])
334            );
335            let data: TestData = message.decode_data().unwrap();
336            assert_eq!(data.message, "Hello World".to_string());
337        } else {
338            panic!("Expected a message");
339        }
340    }
341
342    #[tokio::test]
343    async fn test_message_with_only_data() {
344        let sample_data = r#"data: {"message": "Just some data"}
345
346"#;
347        let cursor = Cursor::new(sample_data);
348        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
349
350        if let Some(Ok(message)) = framed.next().await {
351            assert!(message.id.is_none());
352            assert!(message.event.is_none());
353            assert!(message.comments.is_none());
354            let data: TestData = message.decode_data().unwrap();
355            assert_eq!(data.message, "Just some data".to_string());
356        } else {
357            panic!("Expected a message");
358        }
359    }
360
361    #[tokio::test]
362    async fn test_message_with_only_comment() {
363        let sample_data = r#": This is a comment
364
365"#;
366        let cursor = Cursor::new(sample_data);
367        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
368
369        if let Some(Ok(message)) = framed.next().await {
370            assert!(message.id.is_none());
371            assert!(message.event.is_none());
372            assert!(message.data.is_none());
373            assert_eq!(
374                message.comments,
375                Some(vec!["This is a comment".to_string()])
376            );
377        } else {
378            panic!("Expected a message");
379        }
380    }
381
382    #[tokio::test]
383    async fn test_message_with_multiple_comments() {
384        let sample_data = r#": First comment
385: Second comment
386
387"#;
388        let cursor = Cursor::new(sample_data);
389        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
390
391        if let Some(Ok(message)) = framed.next().await {
392            assert!(message.id.is_none());
393            assert!(message.event.is_none());
394            assert!(message.data.is_none());
395            assert_eq!(
396                message.comments,
397                Some(vec![
398                    "First comment".to_string(),
399                    "Second comment".to_string()
400                ])
401            );
402        } else {
403            panic!("Expected a message");
404        }
405    }
406
407    #[tokio::test]
408    async fn test_message_with_partial_fields() {
409        let sample_data = r#"id: 456
410data: {"message": "Partial data"}
411
412"#;
413        let cursor = Cursor::new(sample_data);
414        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
415
416        if let Some(Ok(message)) = framed.next().await {
417            assert_eq!(message.id, Some("456".to_string()));
418            assert!(message.event.is_none());
419            assert!(message.comments.is_none());
420            let data: TestData = message.decode_data().unwrap();
421            assert_eq!(data.message, "Partial data".to_string());
422        } else {
423            panic!("Expected a message");
424        }
425    }
426
427    #[tokio::test]
428    async fn test_message_with_invalid_json_data() {
429        let sample_data = r#"data: {"message": "Invalid JSON
430
431"#;
432        let cursor = Cursor::new(sample_data);
433        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
434
435        if let Some(result) = framed.next().await {
436            match result {
437                Ok(message) => {
438                    // got a message, but it has invalid json
439                    let data = message.decode_data::<TestData>();
440                    assert!(data.is_err(), "Expected an error; got {:?}", data);
441                }
442                _ => panic!("Expected a message"),
443            }
444        } else {
445            panic!("Expected an error");
446        }
447    }
448
449    #[tokio::test]
450    async fn test_message_with_missing_data_field() {
451        let sample_data = r#"id: 789
452event: test_event
453
454"#;
455        let cursor = Cursor::new(sample_data);
456        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
457
458        if let Some(Ok(message)) = framed.next().await {
459            assert_eq!(message.id, Some("789".to_string()));
460            assert_eq!(message.event, Some("test_event".to_string()));
461            assert!(message.data.is_none());
462            assert!(message.comments.is_none());
463        } else {
464            panic!("Expected a message");
465        }
466    }
467
468    #[tokio::test]
469    async fn test_message_with_empty_data_field() {
470        let sample_data = r#"data:
471
472"#;
473        let cursor = Cursor::new(sample_data);
474        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
475
476        if let Some(result) = framed.next().await {
477            match result {
478                Ok(_) => {
479                    panic!("Expected no message");
480                }
481                Err(e) => panic!("Unexpected error: {}", e),
482            }
483        } else {
484            // no message is emitted
485        }
486    }
487
488    #[tokio::test]
489    async fn test_message_with_multiple_data_lines() {
490        let sample_data = r#"data: {"message": "Line1"}
491data: {"message": "Line2"}
492
493"#;
494        let cursor = Cursor::new(sample_data);
495        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
496
497        if let Some(result) = framed.next().await {
498            match result {
499                Ok(message) => {
500                    // got a message with data, but the data is junk
501                    let data = message.decode_data::<TestData>();
502                    assert!(data.is_err(), "Expected an error; got {:?}", data);
503                }
504                _ => panic!("Expected a message"),
505            }
506        } else {
507            panic!("Expected an error");
508        }
509    }
510
511    #[tokio::test]
512    async fn test_message_with_unrecognized_field() {
513        let sample_data = r#"unknown: value
514data: {"message": "Hello"}
515
516"#;
517        let cursor = Cursor::new(sample_data);
518        let mut framed = FramedRead::new(cursor, SseLineCodec::new());
519
520        if let Some(Ok(message)) = framed.next().await {
521            // Unrecognized fields are ignored
522            assert!(message.id.is_none());
523            assert!(message.event.is_none());
524            assert!(message.comments.is_none());
525            let data: TestData = message.decode_data().unwrap();
526            assert_eq!(data.message, "Hello".to_string());
527        } else {
528            panic!("Expected a message");
529        }
530    }
531
532    // data recorded on 2024-09-30 from
533    // + curl https://integrate.api.nvidia.com/v1/chat/completions -H 'Content-Type: application/json' \
534    //     -H 'Authorization: Bearer nvapi-<redacted>' -d '{
535    //     "model": "mistralai/mixtral-8x22b-instruct-v0.1",
536    //     "messages": [{"role":"user","content":"Write a limerick about the wonders of GPU computing."}],
537    //     "temperature": 0.5,
538    //     "top_p": 1,
539    //     "max_tokens": 64,
540    //     "stream": true
541    //   }'
542    const SAMPLE_CHAT_DATA: &str = r#"
543data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":"assistant","content":null},"logprobs":null,"finish_reason":null}]}
544
545data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"A"},"logprobs":null,"finish_reason":null}]}
546
547data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" GPU"},"logprobs":null,"finish_reason":null}]}
548
549data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" so"},"logprobs":null,"finish_reason":null}]}
550
551data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" swift"},"logprobs":null,"finish_reason":null}]}
552
553data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" and"},"logprobs":null,"finish_reason":null}]}
554
555data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" so"},"logprobs":null,"finish_reason":null}]}
556
557data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" clever"},"logprobs":null,"finish_reason":null}]}
558
559data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
560
561data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
562
563data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"In"},"logprobs":null,"finish_reason":null}]}
564
565data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" comput"},"logprobs":null,"finish_reason":null}]}
566
567data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"ations"},"logprobs":null,"finish_reason":null}]}
568
569data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" it"},"logprobs":null,"finish_reason":null}]}
570
571data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"'"},"logprobs":null,"finish_reason":null}]}
572
573data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"s"},"logprobs":null,"finish_reason":null}]}
574
575data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" quite"},"logprobs":null,"finish_reason":null}]}
576
577data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" the"},"logprobs":null,"finish_reason":null}]}
578
579data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" ende"},"logprobs":null,"finish_reason":null}]}
580
581data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"avor"},"logprobs":null,"finish_reason":null}]}
582
583data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"."},"logprobs":null,"finish_reason":null}]}
584
585data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
586
587data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"With"},"logprobs":null,"finish_reason":null}]}
588
589data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" its"},"logprobs":null,"finish_reason":null}]}
590
591data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" thousands"},"logprobs":null,"finish_reason":null}]}
592
593data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" of"},"logprobs":null,"finish_reason":null}]}
594
595data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" co"},"logprobs":null,"finish_reason":null}]}
596
597data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"res"},"logprobs":null,"finish_reason":null}]}
598
599data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
600
601data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
602
603data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"On"},"logprobs":null,"finish_reason":null}]}
604
605data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" complex"},"logprobs":null,"finish_reason":null}]}
606
607data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" tasks"},"logprobs":null,"finish_reason":null}]}
608
609data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" it"},"logprobs":null,"finish_reason":null}]}
610
611data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" ro"},"logprobs":null,"finish_reason":null}]}
612
613data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"ars"},"logprobs":null,"finish_reason":null}]}
614
615data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
616
617data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
618
619data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"S"},"logprobs":null,"finish_reason":null}]}
620
621data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"olving"},"logprobs":null,"finish_reason":null}]}
622
623data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" problems"},"logprobs":null,"finish_reason":null}]}
624
625data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" like"},"logprobs":null,"finish_reason":null}]}
626
627data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" never"},"logprobs":null,"finish_reason":null}]}
628
629data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
630
631data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" forever"},"logprobs":null,"finish_reason":null}]}
632
633data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"!"},"logprobs":null,"finish_reason":null}]}
634
635data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":""},"logprobs":null,"finish_reason":"stop","stop_reason":null}]}
636
637data: [DONE]
638
639"#;
640
641    #[tokio::test]
642    async fn test_openai_chat_stream() {
643        use crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse;
644
645        // let cursor = Cursor::new(SAMPLE_CHAT_DATA);
646        // let mut framed = FramedRead::new(cursor, SseLineCodec::new());
647
648        let mut stream = create_message_stream(SAMPLE_CHAT_DATA);
649
650        let mut counter = 0;
651
652        loop {
653            match stream.next().await {
654                Some(Ok(message)) => {
655                    let delta: NvCreateChatCompletionStreamResponse =
656                        serde_json::from_str(&message.data.unwrap()).unwrap();
657                    counter += 1;
658                    println!("counter: {}", counter);
659                    println!("delta: {:?}", delta);
660                }
661                Some(Err(e)) => {
662                    panic!("Error: {:?}", e);
663                }
664                None => {
665                    break;
666                }
667            }
668        }
669
670        assert_eq!(counter, 47);
671    }
672
673    #[test]
674    fn test_successful_conversion() {
675        let message = Message {
676            id: Some("123".to_string()),
677            event: Some("update".to_string()),
678            data: Some(r#"{"message": "Hello World"}"#.to_string()),
679            comments: Some(vec!["Some comment".to_string()]),
680        };
681
682        let annotated: Annotated<TestData> = message.try_into().unwrap();
683
684        assert_eq!(annotated.id, Some("123".to_string()));
685        assert_eq!(annotated.event, Some("update".to_string()));
686        assert_eq!(annotated.comment, Some(vec!["Some comment".to_string()]));
687        assert_eq!(
688            annotated.data,
689            Some(TestData {
690                message: "Hello World".to_string()
691            })
692        );
693    }
694
695    #[test]
696    fn test_error_event_with_comments() {
697        let message = Message {
698            id: Some("456".to_string()),
699            event: Some("error".to_string()),
700            data: Some("Error data".to_string()),
701            comments: Some(vec!["An error occurred".to_string()]),
702        };
703
704        let result: Result<Annotated<TestData>, _> = message.try_into();
705
706        assert!(result.is_err());
707        assert_eq!(result.unwrap_err(), "An error occurred".to_string());
708    }
709
710    #[test]
711    fn test_error_event_without_comments() {
712        let message = Message {
713            id: Some("789".to_string()),
714            event: Some("error".to_string()),
715            data: Some("Error data".to_string()),
716            comments: None,
717        };
718
719        let result: Result<Annotated<TestData>, _> = message.try_into();
720
721        assert!(result.is_err());
722    }
723
724    #[test]
725    fn test_invalid_json_data() {
726        let message = Message {
727            id: None,
728            event: Some("update".to_string()),
729            data: Some("Invalid JSON".to_string()),
730            comments: None,
731        };
732
733        let result: Result<Annotated<TestData>, _> = message.try_into();
734
735        assert!(result.is_err());
736    }
737
738    #[test]
739    fn test_missing_data_field() {
740        let message = Message {
741            id: None,
742            event: Some("update".to_string()),
743            data: None,
744            comments: None,
745        };
746
747        let result: Result<Annotated<TestData>, _> = message.try_into();
748
749        assert!(result.is_ok());
750        let annotated = result.unwrap();
751        assert!(annotated.data.is_none());
752        assert_eq!(annotated.event, Some("update".to_string()));
753    }
754}