1use std::{io::Cursor, pin::Pin};
15
16use bytes::BytesMut;
17use futures::Stream;
18use serde::Deserialize;
19use tokio_util::codec::{Decoder, FramedRead, LinesCodec};
20
21use super::Annotated;
22
23#[derive(Debug, thiserror::Error)]
25pub enum SseCodecError {
26 #[error("SseLineCodec decode error: {0}")]
27 DecodeError(String),
28
29 #[error("IO error: {0}")]
30 IoError(#[from] std::io::Error),
31}
32
33pub struct SseLineCodec {
42 lines_codec: LinesCodec,
43 data_buffer: String,
44 event_type_buffer: String,
45 last_event_id_buffer: String,
46 comments_buffer: Vec<String>,
47}
48
49#[derive(Debug)]
57pub struct Message {
58 pub id: Option<String>,
59 pub event: Option<String>,
60 pub data: Option<String>,
61 pub comments: Option<Vec<String>>,
62}
63
64impl Message {
65 pub fn decode_data<T>(&self) -> Result<T, SseCodecError>
71 where
72 T: for<'de> Deserialize<'de>,
73 {
74 serde_json::from_str(self.data.as_ref().ok_or(SseCodecError::DecodeError(
75 "no data: message to decode".to_string(),
76 ))?)
77 .map_err(|e| SseCodecError::DecodeError(format!("failed to deserialized data: {}", e)))
78 }
79}
80
81impl<T> TryFrom<Message> for Annotated<T>
82where
83 T: for<'de> Deserialize<'de>,
84{
85 type Error = String;
86
87 fn try_from(value: Message) -> Result<Annotated<T>, Self::Error> {
88 if let Some(event) = value.event.as_ref()
90 && event == "error"
91 {
92 let message = match &value.comments {
93 Some(comments) => comments.join("\n"),
94 None => "`event: error` detected, but no error message found".to_string(),
95 };
96 return Err(message);
97 }
98
99 let data: Option<T> = match &value.data {
102 Some(_) => value.decode_data().map_err(|e| e.to_string())?,
103 None => None,
104 };
105
106 Ok(Annotated {
107 data,
108 id: value.id,
109 event: value.event,
110 comment: value.comments,
111 })
112 }
113}
114
115impl SseLineCodec {
116 pub fn new() -> Self {
118 Self::default()
119 }
120}
121
122impl Default for SseLineCodec {
123 fn default() -> Self {
124 Self {
125 lines_codec: LinesCodec::new(),
126 data_buffer: String::new(),
127 event_type_buffer: String::new(),
128 last_event_id_buffer: String::new(),
129 comments_buffer: Vec::new(),
130 }
131 }
132}
133
134impl Decoder for SseLineCodec {
135 type Item = Message;
136 type Error = SseCodecError;
137
138 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
139 loop {
140 match self
141 .lines_codec
142 .decode(src)
143 .map_err(|e| SseCodecError::DecodeError(e.to_string()))?
144 {
145 Some(line) => {
146 let line = line.trim_end_matches(&['\r', '\n'][..]);
147 if line.is_empty() {
148 if !self.data_buffer.is_empty()
150 || !self.event_type_buffer.is_empty()
151 || !self.last_event_id_buffer.is_empty()
152 || !self.comments_buffer.is_empty()
153 {
154 if self.data_buffer.ends_with('\n') {
156 self.data_buffer.pop();
157 }
158
159 let data = if !self.data_buffer.is_empty() {
160 Some(std::mem::take(&mut self.data_buffer))
161 } else {
162 None
163 };
164
165 let message = Message {
166 id: if self.last_event_id_buffer.is_empty() {
167 None
168 } else {
169 Some(std::mem::take(&mut self.last_event_id_buffer))
170 },
171 event: if self.event_type_buffer.is_empty() {
172 None
173 } else {
174 Some(std::mem::take(&mut self.event_type_buffer))
175 },
176 data,
177 comments: if self.comments_buffer.is_empty() {
178 None
179 } else {
180 Some(std::mem::take(&mut self.comments_buffer))
181 },
182 };
183 return Ok(Some(message));
185 } else {
186 continue;
188 }
189 } else if let Some(comment) = line.strip_prefix(':') {
190 self.comments_buffer.push(comment.trim().into());
191 } else {
192 let (field_name, field_value) = if let Some(idx) = line.find(':') {
193 let (name, value) = line.split_at(idx);
194 let value = value[1..].trim_start_matches(' ');
195 (name, value)
196 } else {
197 (line, "")
198 };
199
200 match field_name {
201 "event" => {
202 self.event_type_buffer = field_value.to_string();
203 }
204 "data" => {
205 if field_value != "[DONE]" {
206 if !self.data_buffer.is_empty() {
207 self.data_buffer.push('\n');
208 }
209 self.data_buffer.push_str(field_value);
210 }
211 }
212 "id" => {
213 if !field_value.contains('\0') {
214 self.last_event_id_buffer = field_value.to_string();
215 }
216 }
217 "retry" => {
218 }
220 _ => {
221 }
223 }
224 }
225 }
226 None => {
227 return Ok(None);
229 }
230 }
231 }
232 }
233
234 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
235 let result = self.decode(src)?;
237 if result.is_some() {
238 return Ok(result);
239 }
240 if self.data_buffer.is_empty()
242 && self.event_type_buffer.is_empty()
243 && self.last_event_id_buffer.is_empty()
244 && self.comments_buffer.is_empty()
245 {
246 Ok(None)
247 } else {
248 if self.data_buffer.ends_with('\n') {
250 self.data_buffer.pop();
251 }
252
253 let data = if !self.data_buffer.is_empty() {
254 Some(std::mem::take(&mut self.data_buffer))
255 } else {
256 None
257 };
258
259 let message = Message {
260 id: if self.last_event_id_buffer.is_empty() {
261 None
262 } else {
263 Some(std::mem::take(&mut self.last_event_id_buffer))
264 },
265 event: if self.event_type_buffer.is_empty() {
266 None
267 } else {
268 Some(std::mem::take(&mut self.event_type_buffer))
269 },
270 data,
271 comments: if self.comments_buffer.is_empty() {
272 None
273 } else {
274 Some(std::mem::take(&mut self.comments_buffer))
275 },
276 };
277 Ok(Some(message))
279 }
280 }
281}
282
283pub fn create_message_stream(
285 text: &str,
286) -> Pin<Box<dyn Stream<Item = Result<Message, SseCodecError>> + Send + Sync>> {
287 let cursor = Cursor::new(text.to_string());
288 let framed = FramedRead::new(cursor, SseLineCodec::new());
289 Box::pin(framed)
290}
291
292#[cfg(test)]
293mod tests {
294 use std::io::Cursor;
295
296 use futures::stream::StreamExt;
297 use tokio_util::codec::FramedRead;
298
299 use super::*;
300
301 #[derive(Deserialize, Debug, PartialEq)]
302 struct TestData {
303 message: String,
304 }
305
306 #[tokio::test]
307 async fn test_message_with_all_fields() {
308 let sample_data = r#"id: 123
309event: test
310data: {"message": "Hello World"}
311: This is a comment
312
313"#;
314 let cursor = Cursor::new(sample_data);
315 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
316
317 if let Some(Ok(message)) = framed.next().await {
318 assert_eq!(message.id, Some("123".to_string()));
319 assert_eq!(message.event, Some("test".to_string()));
320 assert_eq!(
321 message.comments,
322 Some(vec!["This is a comment".to_string()])
323 );
324 let data: TestData = message.decode_data().unwrap();
325 assert_eq!(data.message, "Hello World".to_string());
326 } else {
327 panic!("Expected a message");
328 }
329 }
330
331 #[tokio::test]
332 async fn test_message_with_only_data() {
333 let sample_data = r#"data: {"message": "Just some data"}
334
335"#;
336 let cursor = Cursor::new(sample_data);
337 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
338
339 if let Some(Ok(message)) = framed.next().await {
340 assert!(message.id.is_none());
341 assert!(message.event.is_none());
342 assert!(message.comments.is_none());
343 let data: TestData = message.decode_data().unwrap();
344 assert_eq!(data.message, "Just some data".to_string());
345 } else {
346 panic!("Expected a message");
347 }
348 }
349
350 #[tokio::test]
351 async fn test_message_with_only_comment() {
352 let sample_data = r#": This is a comment
353
354"#;
355 let cursor = Cursor::new(sample_data);
356 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
357
358 if let Some(Ok(message)) = framed.next().await {
359 assert!(message.id.is_none());
360 assert!(message.event.is_none());
361 assert!(message.data.is_none());
362 assert_eq!(
363 message.comments,
364 Some(vec!["This is a comment".to_string()])
365 );
366 } else {
367 panic!("Expected a message");
368 }
369 }
370
371 #[tokio::test]
372 async fn test_message_with_multiple_comments() {
373 let sample_data = r#": First comment
374: Second comment
375
376"#;
377 let cursor = Cursor::new(sample_data);
378 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
379
380 if let Some(Ok(message)) = framed.next().await {
381 assert!(message.id.is_none());
382 assert!(message.event.is_none());
383 assert!(message.data.is_none());
384 assert_eq!(
385 message.comments,
386 Some(vec![
387 "First comment".to_string(),
388 "Second comment".to_string()
389 ])
390 );
391 } else {
392 panic!("Expected a message");
393 }
394 }
395
396 #[tokio::test]
397 async fn test_message_with_partial_fields() {
398 let sample_data = r#"id: 456
399data: {"message": "Partial data"}
400
401"#;
402 let cursor = Cursor::new(sample_data);
403 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
404
405 if let Some(Ok(message)) = framed.next().await {
406 assert_eq!(message.id, Some("456".to_string()));
407 assert!(message.event.is_none());
408 assert!(message.comments.is_none());
409 let data: TestData = message.decode_data().unwrap();
410 assert_eq!(data.message, "Partial data".to_string());
411 } else {
412 panic!("Expected a message");
413 }
414 }
415
416 #[tokio::test]
417 async fn test_message_with_invalid_json_data() {
418 let sample_data = r#"data: {"message": "Invalid JSON
419
420"#;
421 let cursor = Cursor::new(sample_data);
422 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
423
424 if let Some(result) = framed.next().await {
425 match result {
426 Ok(message) => {
427 let data = message.decode_data::<TestData>();
429 assert!(data.is_err(), "Expected an error; got {:?}", data);
430 }
431 _ => panic!("Expected a message"),
432 }
433 } else {
434 panic!("Expected an error");
435 }
436 }
437
438 #[tokio::test]
439 async fn test_message_with_missing_data_field() {
440 let sample_data = r#"id: 789
441event: test_event
442
443"#;
444 let cursor = Cursor::new(sample_data);
445 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
446
447 if let Some(Ok(message)) = framed.next().await {
448 assert_eq!(message.id, Some("789".to_string()));
449 assert_eq!(message.event, Some("test_event".to_string()));
450 assert!(message.data.is_none());
451 assert!(message.comments.is_none());
452 } else {
453 panic!("Expected a message");
454 }
455 }
456
457 #[tokio::test]
458 async fn test_message_with_empty_data_field() {
459 let sample_data = r#"data:
460
461"#;
462 let cursor = Cursor::new(sample_data);
463 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
464
465 if let Some(result) = framed.next().await {
466 match result {
467 Ok(_) => {
468 panic!("Expected no message");
469 }
470 Err(e) => panic!("Unexpected error: {}", e),
471 }
472 } else {
473 }
475 }
476
477 #[tokio::test]
478 async fn test_message_with_multiple_data_lines() {
479 let sample_data = r#"data: {"message": "Line1"}
480data: {"message": "Line2"}
481
482"#;
483 let cursor = Cursor::new(sample_data);
484 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
485
486 if let Some(result) = framed.next().await {
487 match result {
488 Ok(message) => {
489 let data = message.decode_data::<TestData>();
491 assert!(data.is_err(), "Expected an error; got {:?}", data);
492 }
493 _ => panic!("Expected a message"),
494 }
495 } else {
496 panic!("Expected an error");
497 }
498 }
499
500 #[tokio::test]
501 async fn test_message_with_unrecognized_field() {
502 let sample_data = r#"unknown: value
503data: {"message": "Hello"}
504
505"#;
506 let cursor = Cursor::new(sample_data);
507 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
508
509 if let Some(Ok(message)) = framed.next().await {
510 assert!(message.id.is_none());
512 assert!(message.event.is_none());
513 assert!(message.comments.is_none());
514 let data: TestData = message.decode_data().unwrap();
515 assert_eq!(data.message, "Hello".to_string());
516 } else {
517 panic!("Expected a message");
518 }
519 }
520
521 const SAMPLE_CHAT_DATA: &str = r#"
532data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":"assistant","content":null},"logprobs":null,"finish_reason":null}]}
533
534data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"A"},"logprobs":null,"finish_reason":null}]}
535
536data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" GPU"},"logprobs":null,"finish_reason":null}]}
537
538data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" so"},"logprobs":null,"finish_reason":null}]}
539
540data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" swift"},"logprobs":null,"finish_reason":null}]}
541
542data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" and"},"logprobs":null,"finish_reason":null}]}
543
544data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" so"},"logprobs":null,"finish_reason":null}]}
545
546data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" clever"},"logprobs":null,"finish_reason":null}]}
547
548data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
549
550data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
551
552data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"In"},"logprobs":null,"finish_reason":null}]}
553
554data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" comput"},"logprobs":null,"finish_reason":null}]}
555
556data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"ations"},"logprobs":null,"finish_reason":null}]}
557
558data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" it"},"logprobs":null,"finish_reason":null}]}
559
560data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"'"},"logprobs":null,"finish_reason":null}]}
561
562data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"s"},"logprobs":null,"finish_reason":null}]}
563
564data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" quite"},"logprobs":null,"finish_reason":null}]}
565
566data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" the"},"logprobs":null,"finish_reason":null}]}
567
568data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" ende"},"logprobs":null,"finish_reason":null}]}
569
570data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"avor"},"logprobs":null,"finish_reason":null}]}
571
572data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"."},"logprobs":null,"finish_reason":null}]}
573
574data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
575
576data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"With"},"logprobs":null,"finish_reason":null}]}
577
578data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" its"},"logprobs":null,"finish_reason":null}]}
579
580data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" thousands"},"logprobs":null,"finish_reason":null}]}
581
582data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" of"},"logprobs":null,"finish_reason":null}]}
583
584data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" co"},"logprobs":null,"finish_reason":null}]}
585
586data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"res"},"logprobs":null,"finish_reason":null}]}
587
588data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
589
590data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
591
592data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"On"},"logprobs":null,"finish_reason":null}]}
593
594data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" complex"},"logprobs":null,"finish_reason":null}]}
595
596data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" tasks"},"logprobs":null,"finish_reason":null}]}
597
598data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" it"},"logprobs":null,"finish_reason":null}]}
599
600data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" ro"},"logprobs":null,"finish_reason":null}]}
601
602data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"ars"},"logprobs":null,"finish_reason":null}]}
603
604data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
605
606data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
607
608data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"S"},"logprobs":null,"finish_reason":null}]}
609
610data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"olving"},"logprobs":null,"finish_reason":null}]}
611
612data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" problems"},"logprobs":null,"finish_reason":null}]}
613
614data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" like"},"logprobs":null,"finish_reason":null}]}
615
616data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" never"},"logprobs":null,"finish_reason":null}]}
617
618data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
619
620data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" forever"},"logprobs":null,"finish_reason":null}]}
621
622data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"!"},"logprobs":null,"finish_reason":null}]}
623
624data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":""},"logprobs":null,"finish_reason":"stop","stop_reason":null}]}
625
626data: [DONE]
627
628"#;
629
630 #[tokio::test]
631 async fn test_openai_chat_stream() {
632 use crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse;
633
634 let mut stream = create_message_stream(SAMPLE_CHAT_DATA);
638
639 let mut counter = 0;
640
641 loop {
642 match stream.next().await {
643 Some(Ok(message)) => {
644 let delta: NvCreateChatCompletionStreamResponse =
645 serde_json::from_str(&message.data.unwrap()).unwrap();
646 counter += 1;
647 println!("counter: {}", counter);
648 println!("delta: {:?}", delta);
649 }
650 Some(Err(e)) => {
651 panic!("Error: {:?}", e);
652 }
653 None => {
654 break;
655 }
656 }
657 }
658
659 assert_eq!(counter, 47);
660 }
661
662 #[test]
663 fn test_successful_conversion() {
664 let message = Message {
665 id: Some("123".to_string()),
666 event: Some("update".to_string()),
667 data: Some(r#"{"message": "Hello World"}"#.to_string()),
668 comments: Some(vec!["Some comment".to_string()]),
669 };
670
671 let annotated: Annotated<TestData> = message.try_into().unwrap();
672
673 assert_eq!(annotated.id, Some("123".to_string()));
674 assert_eq!(annotated.event, Some("update".to_string()));
675 assert_eq!(annotated.comment, Some(vec!["Some comment".to_string()]));
676 assert_eq!(
677 annotated.data,
678 Some(TestData {
679 message: "Hello World".to_string()
680 })
681 );
682 }
683
684 #[test]
685 fn test_error_event_with_comments() {
686 let message = Message {
687 id: Some("456".to_string()),
688 event: Some("error".to_string()),
689 data: Some("Error data".to_string()),
690 comments: Some(vec!["An error occurred".to_string()]),
691 };
692
693 let result: Result<Annotated<TestData>, _> = message.try_into();
694
695 assert!(result.is_err());
696 assert_eq!(result.unwrap_err(), "An error occurred".to_string());
697 }
698
699 #[test]
700 fn test_error_event_without_comments() {
701 let message = Message {
702 id: Some("789".to_string()),
703 event: Some("error".to_string()),
704 data: Some("Error data".to_string()),
705 comments: None,
706 };
707
708 let result: Result<Annotated<TestData>, _> = message.try_into();
709
710 assert!(result.is_err());
711 }
712
713 #[test]
714 fn test_invalid_json_data() {
715 let message = Message {
716 id: None,
717 event: Some("update".to_string()),
718 data: Some("Invalid JSON".to_string()),
719 comments: None,
720 };
721
722 let result: Result<Annotated<TestData>, _> = message.try_into();
723
724 assert!(result.is_err());
725 }
726
727 #[test]
728 fn test_missing_data_field() {
729 let message = Message {
730 id: None,
731 event: Some("update".to_string()),
732 data: None,
733 comments: None,
734 };
735
736 let result: Result<Annotated<TestData>, _> = message.try_into();
737
738 assert!(result.is_ok());
739 let annotated = result.unwrap();
740 assert!(annotated.data.is_none());
741 assert_eq!(annotated.event, Some("update".to_string()));
742 }
743}