1use bytes::BytesMut;
27use futures::Stream;
28use serde::Deserialize;
29use std::{io::Cursor, pin::Pin};
30use tokio_util::codec::{Decoder, FramedRead, LinesCodec};
31
32use super::Annotated;
33
34#[derive(Debug, thiserror::Error)]
36pub enum SseCodecError {
37 #[error("SseLineCodec decode error: {0}")]
38 DecodeError(String),
39
40 #[error("IO error: {0}")]
41 IoError(#[from] std::io::Error),
42}
43
44pub struct SseLineCodec {
53 lines_codec: LinesCodec,
54 data_buffer: String,
55 event_type_buffer: String,
56 last_event_id_buffer: String,
57 comments_buffer: Vec<String>,
58}
59
60#[derive(Debug)]
68pub struct Message {
69 pub id: Option<String>,
70 pub event: Option<String>,
71 pub data: Option<String>,
72 pub comments: Option<Vec<String>>,
73}
74
75impl Message {
76 pub fn decode_data<T>(&self) -> Result<T, SseCodecError>
82 where
83 T: for<'de> Deserialize<'de>,
84 {
85 serde_json::from_str(self.data.as_ref().ok_or(SseCodecError::DecodeError(
86 "no data: message to decode".to_string(),
87 ))?)
88 .map_err(|e| SseCodecError::DecodeError(format!("failed to deserialized data: {}", e)))
89 }
90}
91
92impl<T> TryFrom<Message> for Annotated<T>
93where
94 T: for<'de> Deserialize<'de>,
95{
96 type Error = String;
97
98 fn try_from(value: Message) -> Result<Annotated<T>, Self::Error> {
99 if let Some(event) = value.event.as_ref() {
101 if event == "error" {
102 let message = match &value.comments {
103 Some(comments) => comments.join("\n"),
104 None => "`event: error` detected, but no error message found".to_string(),
105 };
106 return Err(message);
107 }
108 }
109
110 let data: Option<T> = match &value.data {
113 Some(_) => value.decode_data().map_err(|e| e.to_string())?,
114 None => None,
115 };
116
117 Ok(Annotated {
118 data,
119 id: value.id,
120 event: value.event,
121 comment: value.comments,
122 })
123 }
124}
125
126impl SseLineCodec {
127 pub fn new() -> Self {
129 Self::default()
130 }
131}
132
133impl Default for SseLineCodec {
134 fn default() -> Self {
135 Self {
136 lines_codec: LinesCodec::new(),
137 data_buffer: String::new(),
138 event_type_buffer: String::new(),
139 last_event_id_buffer: String::new(),
140 comments_buffer: Vec::new(),
141 }
142 }
143}
144
145impl Decoder for SseLineCodec {
146 type Item = Message;
147 type Error = SseCodecError;
148
149 fn decode(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
150 loop {
151 match self
152 .lines_codec
153 .decode(src)
154 .map_err(|e| SseCodecError::DecodeError(e.to_string()))?
155 {
156 Some(line) => {
157 let line = line.trim_end_matches(&['\r', '\n'][..]);
158 if line.is_empty() {
159 if !self.data_buffer.is_empty()
161 || !self.event_type_buffer.is_empty()
162 || !self.last_event_id_buffer.is_empty()
163 || !self.comments_buffer.is_empty()
164 {
165 if self.data_buffer.ends_with('\n') {
167 self.data_buffer.pop();
168 }
169
170 let data = if !self.data_buffer.is_empty() {
171 Some(std::mem::take(&mut self.data_buffer))
172 } else {
173 None
174 };
175
176 let message = Message {
177 id: if self.last_event_id_buffer.is_empty() {
178 None
179 } else {
180 Some(std::mem::take(&mut self.last_event_id_buffer))
181 },
182 event: if self.event_type_buffer.is_empty() {
183 None
184 } else {
185 Some(std::mem::take(&mut self.event_type_buffer))
186 },
187 data,
188 comments: if self.comments_buffer.is_empty() {
189 None
190 } else {
191 Some(std::mem::take(&mut self.comments_buffer))
192 },
193 };
194 return Ok(Some(message));
196 } else {
197 continue;
199 }
200 } else if let Some(comment) = line.strip_prefix(':') {
201 self.comments_buffer.push(comment.trim().into());
202 } else {
203 let (field_name, field_value) = if let Some(idx) = line.find(':') {
204 let (name, value) = line.split_at(idx);
205 let value = value[1..].trim_start_matches(' ');
206 (name, value)
207 } else {
208 (line, "")
209 };
210
211 match field_name {
212 "event" => {
213 self.event_type_buffer = field_value.to_string();
214 }
215 "data" => {
216 if field_value != "[DONE]" {
217 if !self.data_buffer.is_empty() {
218 self.data_buffer.push('\n');
219 }
220 self.data_buffer.push_str(field_value);
221 }
222 }
223 "id" => {
224 if !field_value.contains('\0') {
225 self.last_event_id_buffer = field_value.to_string();
226 }
227 }
228 "retry" => {
229 }
231 _ => {
232 }
234 }
235 }
236 }
237 None => {
238 return Ok(None);
240 }
241 }
242 }
243 }
244
245 fn decode_eof(&mut self, src: &mut BytesMut) -> Result<Option<Self::Item>, Self::Error> {
246 let result = self.decode(src)?;
248 if result.is_some() {
249 return Ok(result);
250 }
251 if self.data_buffer.is_empty()
253 && self.event_type_buffer.is_empty()
254 && self.last_event_id_buffer.is_empty()
255 && self.comments_buffer.is_empty()
256 {
257 Ok(None)
258 } else {
259 if self.data_buffer.ends_with('\n') {
261 self.data_buffer.pop();
262 }
263
264 let data = if !self.data_buffer.is_empty() {
265 Some(std::mem::take(&mut self.data_buffer))
266 } else {
267 None
268 };
269
270 let message = Message {
271 id: if self.last_event_id_buffer.is_empty() {
272 None
273 } else {
274 Some(std::mem::take(&mut self.last_event_id_buffer))
275 },
276 event: if self.event_type_buffer.is_empty() {
277 None
278 } else {
279 Some(std::mem::take(&mut self.event_type_buffer))
280 },
281 data,
282 comments: if self.comments_buffer.is_empty() {
283 None
284 } else {
285 Some(std::mem::take(&mut self.comments_buffer))
286 },
287 };
288 Ok(Some(message))
290 }
291 }
292}
293
294pub fn create_message_stream(
296 text: &str,
297) -> Pin<Box<dyn Stream<Item = Result<Message, SseCodecError>> + Send + Sync>> {
298 let cursor = Cursor::new(text.to_string());
299 let framed = FramedRead::new(cursor, SseLineCodec::new());
300 Box::pin(framed)
301}
302
303#[cfg(test)]
304mod tests {
305 use std::io::Cursor;
306
307 use futures::stream::StreamExt;
308 use tokio_util::codec::FramedRead;
309
310 use super::*;
311
312 #[derive(Deserialize, Debug, PartialEq)]
313 struct TestData {
314 message: String,
315 }
316
317 #[tokio::test]
318 async fn test_message_with_all_fields() {
319 let sample_data = r#"id: 123
320event: test
321data: {"message": "Hello World"}
322: This is a comment
323
324"#;
325 let cursor = Cursor::new(sample_data);
326 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
327
328 if let Some(Ok(message)) = framed.next().await {
329 assert_eq!(message.id, Some("123".to_string()));
330 assert_eq!(message.event, Some("test".to_string()));
331 assert_eq!(
332 message.comments,
333 Some(vec!["This is a comment".to_string()])
334 );
335 let data: TestData = message.decode_data().unwrap();
336 assert_eq!(data.message, "Hello World".to_string());
337 } else {
338 panic!("Expected a message");
339 }
340 }
341
342 #[tokio::test]
343 async fn test_message_with_only_data() {
344 let sample_data = r#"data: {"message": "Just some data"}
345
346"#;
347 let cursor = Cursor::new(sample_data);
348 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
349
350 if let Some(Ok(message)) = framed.next().await {
351 assert!(message.id.is_none());
352 assert!(message.event.is_none());
353 assert!(message.comments.is_none());
354 let data: TestData = message.decode_data().unwrap();
355 assert_eq!(data.message, "Just some data".to_string());
356 } else {
357 panic!("Expected a message");
358 }
359 }
360
361 #[tokio::test]
362 async fn test_message_with_only_comment() {
363 let sample_data = r#": This is a comment
364
365"#;
366 let cursor = Cursor::new(sample_data);
367 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
368
369 if let Some(Ok(message)) = framed.next().await {
370 assert!(message.id.is_none());
371 assert!(message.event.is_none());
372 assert!(message.data.is_none());
373 assert_eq!(
374 message.comments,
375 Some(vec!["This is a comment".to_string()])
376 );
377 } else {
378 panic!("Expected a message");
379 }
380 }
381
382 #[tokio::test]
383 async fn test_message_with_multiple_comments() {
384 let sample_data = r#": First comment
385: Second comment
386
387"#;
388 let cursor = Cursor::new(sample_data);
389 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
390
391 if let Some(Ok(message)) = framed.next().await {
392 assert!(message.id.is_none());
393 assert!(message.event.is_none());
394 assert!(message.data.is_none());
395 assert_eq!(
396 message.comments,
397 Some(vec![
398 "First comment".to_string(),
399 "Second comment".to_string()
400 ])
401 );
402 } else {
403 panic!("Expected a message");
404 }
405 }
406
407 #[tokio::test]
408 async fn test_message_with_partial_fields() {
409 let sample_data = r#"id: 456
410data: {"message": "Partial data"}
411
412"#;
413 let cursor = Cursor::new(sample_data);
414 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
415
416 if let Some(Ok(message)) = framed.next().await {
417 assert_eq!(message.id, Some("456".to_string()));
418 assert!(message.event.is_none());
419 assert!(message.comments.is_none());
420 let data: TestData = message.decode_data().unwrap();
421 assert_eq!(data.message, "Partial data".to_string());
422 } else {
423 panic!("Expected a message");
424 }
425 }
426
427 #[tokio::test]
428 async fn test_message_with_invalid_json_data() {
429 let sample_data = r#"data: {"message": "Invalid JSON
430
431"#;
432 let cursor = Cursor::new(sample_data);
433 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
434
435 if let Some(result) = framed.next().await {
436 match result {
437 Ok(message) => {
438 let data = message.decode_data::<TestData>();
440 assert!(data.is_err(), "Expected an error; got {:?}", data);
441 }
442 _ => panic!("Expected a message"),
443 }
444 } else {
445 panic!("Expected an error");
446 }
447 }
448
449 #[tokio::test]
450 async fn test_message_with_missing_data_field() {
451 let sample_data = r#"id: 789
452event: test_event
453
454"#;
455 let cursor = Cursor::new(sample_data);
456 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
457
458 if let Some(Ok(message)) = framed.next().await {
459 assert_eq!(message.id, Some("789".to_string()));
460 assert_eq!(message.event, Some("test_event".to_string()));
461 assert!(message.data.is_none());
462 assert!(message.comments.is_none());
463 } else {
464 panic!("Expected a message");
465 }
466 }
467
468 #[tokio::test]
469 async fn test_message_with_empty_data_field() {
470 let sample_data = r#"data:
471
472"#;
473 let cursor = Cursor::new(sample_data);
474 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
475
476 if let Some(result) = framed.next().await {
477 match result {
478 Ok(_) => {
479 panic!("Expected no message");
480 }
481 Err(e) => panic!("Unexpected error: {}", e),
482 }
483 } else {
484 }
486 }
487
488 #[tokio::test]
489 async fn test_message_with_multiple_data_lines() {
490 let sample_data = r#"data: {"message": "Line1"}
491data: {"message": "Line2"}
492
493"#;
494 let cursor = Cursor::new(sample_data);
495 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
496
497 if let Some(result) = framed.next().await {
498 match result {
499 Ok(message) => {
500 let data = message.decode_data::<TestData>();
502 assert!(data.is_err(), "Expected an error; got {:?}", data);
503 }
504 _ => panic!("Expected a message"),
505 }
506 } else {
507 panic!("Expected an error");
508 }
509 }
510
511 #[tokio::test]
512 async fn test_message_with_unrecognized_field() {
513 let sample_data = r#"unknown: value
514data: {"message": "Hello"}
515
516"#;
517 let cursor = Cursor::new(sample_data);
518 let mut framed = FramedRead::new(cursor, SseLineCodec::new());
519
520 if let Some(Ok(message)) = framed.next().await {
521 assert!(message.id.is_none());
523 assert!(message.event.is_none());
524 assert!(message.comments.is_none());
525 let data: TestData = message.decode_data().unwrap();
526 assert_eq!(data.message, "Hello".to_string());
527 } else {
528 panic!("Expected a message");
529 }
530 }
531
532 const SAMPLE_CHAT_DATA: &str = r#"
543data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":"assistant","content":null},"logprobs":null,"finish_reason":null}]}
544
545data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"A"},"logprobs":null,"finish_reason":null}]}
546
547data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" GPU"},"logprobs":null,"finish_reason":null}]}
548
549data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" so"},"logprobs":null,"finish_reason":null}]}
550
551data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" swift"},"logprobs":null,"finish_reason":null}]}
552
553data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" and"},"logprobs":null,"finish_reason":null}]}
554
555data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" so"},"logprobs":null,"finish_reason":null}]}
556
557data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" clever"},"logprobs":null,"finish_reason":null}]}
558
559data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
560
561data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
562
563data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"In"},"logprobs":null,"finish_reason":null}]}
564
565data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" comput"},"logprobs":null,"finish_reason":null}]}
566
567data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"ations"},"logprobs":null,"finish_reason":null}]}
568
569data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" it"},"logprobs":null,"finish_reason":null}]}
570
571data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"'"},"logprobs":null,"finish_reason":null}]}
572
573data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"s"},"logprobs":null,"finish_reason":null}]}
574
575data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" quite"},"logprobs":null,"finish_reason":null}]}
576
577data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" the"},"logprobs":null,"finish_reason":null}]}
578
579data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" ende"},"logprobs":null,"finish_reason":null}]}
580
581data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"avor"},"logprobs":null,"finish_reason":null}]}
582
583data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"."},"logprobs":null,"finish_reason":null}]}
584
585data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
586
587data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"With"},"logprobs":null,"finish_reason":null}]}
588
589data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" its"},"logprobs":null,"finish_reason":null}]}
590
591data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" thousands"},"logprobs":null,"finish_reason":null}]}
592
593data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" of"},"logprobs":null,"finish_reason":null}]}
594
595data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" co"},"logprobs":null,"finish_reason":null}]}
596
597data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"res"},"logprobs":null,"finish_reason":null}]}
598
599data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
600
601data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
602
603data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"On"},"logprobs":null,"finish_reason":null}]}
604
605data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" complex"},"logprobs":null,"finish_reason":null}]}
606
607data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" tasks"},"logprobs":null,"finish_reason":null}]}
608
609data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" it"},"logprobs":null,"finish_reason":null}]}
610
611data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" ro"},"logprobs":null,"finish_reason":null}]}
612
613data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"ars"},"logprobs":null,"finish_reason":null}]}
614
615data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
616
617data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"\n"},"logprobs":null,"finish_reason":null}]}
618
619data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"S"},"logprobs":null,"finish_reason":null}]}
620
621data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"olving"},"logprobs":null,"finish_reason":null}]}
622
623data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" problems"},"logprobs":null,"finish_reason":null}]}
624
625data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" like"},"logprobs":null,"finish_reason":null}]}
626
627data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" never"},"logprobs":null,"finish_reason":null}]}
628
629data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":","},"logprobs":null,"finish_reason":null}]}
630
631data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":" forever"},"logprobs":null,"finish_reason":null}]}
632
633data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":"!"},"logprobs":null,"finish_reason":null}]}
634
635data: {"id":"chat-e135180178ae4fe6a7a301aa65aaeea5","object":"chat.completion.chunk","created":1727750141,"model":"mistralai/mixtral-8x22b-instruct-v0.1","choices":[{"index":0,"delta":{"role":null,"content":""},"logprobs":null,"finish_reason":"stop","stop_reason":null}]}
636
637data: [DONE]
638
639"#;
640
641 #[tokio::test]
642 async fn test_openai_chat_stream() {
643 use crate::protocols::openai::chat_completions::NvCreateChatCompletionStreamResponse;
644
645 let mut stream = create_message_stream(SAMPLE_CHAT_DATA);
649
650 let mut counter = 0;
651
652 loop {
653 match stream.next().await {
654 Some(Ok(message)) => {
655 let delta: NvCreateChatCompletionStreamResponse =
656 serde_json::from_str(&message.data.unwrap()).unwrap();
657 counter += 1;
658 println!("counter: {}", counter);
659 println!("delta: {:?}", delta);
660 }
661 Some(Err(e)) => {
662 panic!("Error: {:?}", e);
663 }
664 None => {
665 break;
666 }
667 }
668 }
669
670 assert_eq!(counter, 47);
671 }
672
673 #[test]
674 fn test_successful_conversion() {
675 let message = Message {
676 id: Some("123".to_string()),
677 event: Some("update".to_string()),
678 data: Some(r#"{"message": "Hello World"}"#.to_string()),
679 comments: Some(vec!["Some comment".to_string()]),
680 };
681
682 let annotated: Annotated<TestData> = message.try_into().unwrap();
683
684 assert_eq!(annotated.id, Some("123".to_string()));
685 assert_eq!(annotated.event, Some("update".to_string()));
686 assert_eq!(annotated.comment, Some(vec!["Some comment".to_string()]));
687 assert_eq!(
688 annotated.data,
689 Some(TestData {
690 message: "Hello World".to_string()
691 })
692 );
693 }
694
695 #[test]
696 fn test_error_event_with_comments() {
697 let message = Message {
698 id: Some("456".to_string()),
699 event: Some("error".to_string()),
700 data: Some("Error data".to_string()),
701 comments: Some(vec!["An error occurred".to_string()]),
702 };
703
704 let result: Result<Annotated<TestData>, _> = message.try_into();
705
706 assert!(result.is_err());
707 assert_eq!(result.unwrap_err(), "An error occurred".to_string());
708 }
709
710 #[test]
711 fn test_error_event_without_comments() {
712 let message = Message {
713 id: Some("789".to_string()),
714 event: Some("error".to_string()),
715 data: Some("Error data".to_string()),
716 comments: None,
717 };
718
719 let result: Result<Annotated<TestData>, _> = message.try_into();
720
721 assert!(result.is_err());
722 }
723
724 #[test]
725 fn test_invalid_json_data() {
726 let message = Message {
727 id: None,
728 event: Some("update".to_string()),
729 data: Some("Invalid JSON".to_string()),
730 comments: None,
731 };
732
733 let result: Result<Annotated<TestData>, _> = message.try_into();
734
735 assert!(result.is_err());
736 }
737
738 #[test]
739 fn test_missing_data_field() {
740 let message = Message {
741 id: None,
742 event: Some("update".to_string()),
743 data: None,
744 comments: None,
745 };
746
747 let result: Result<Annotated<TestData>, _> = message.try_into();
748
749 assert!(result.is_ok());
750 let annotated = result.unwrap();
751 assert!(annotated.data.is_none());
752 assert_eq!(annotated.event, Some("update".to_string()));
753 }
754}