1use alloc::string::{String, ToString};
21use alloc::vec::Vec;
22use bytes::Bytes;
23use core::error::Error as StdError;
24use core::fmt;
25use core::pin::Pin;
26use core::task::{Context, Poll};
27use futures_lite::{Stream, StreamExt};
28use http_body::Frame;
29use http_body_util::StreamBody;
30use pin_project_lite::pin_project;
31#[cfg(feature = "json")]
32use serde::Serialize;
33#[cfg(feature = "json")]
34use serde_json::{self, to_string};
35
36use crate::Body;
37
38#[derive(Debug)]
40pub struct Event {
41 event: Option<String>,
42 data: String,
43 id: Option<String>,
44 retry: Option<u64>,
45}
46
47impl Event {
48 #[cfg(feature = "json")]
74 pub fn new<T: Serialize>(data: &T) -> Result<Self, serde_json::Error> {
75 Ok(Self::from_data(to_string(data)?))
76 }
77
78 pub fn from_data<T: Into<String>>(data: T) -> Self {
88 Self {
89 event: None,
90 data: data.into(),
91 id: None,
92 retry: None,
93 }
94 }
95
96 pub const fn id(&self) -> Option<&str> {
98 if let Some(id) = self.id.as_ref() {
99 Some(id.as_str())
100 } else {
101 None
102 }
103 }
104
105 pub const fn event(&self) -> Option<&str> {
107 if let Some(event) = self.event.as_ref() {
108 Some(event.as_str())
109 } else {
110 None
111 }
112 }
113
114 pub const fn retry(&self) -> Option<u64> {
116 self.retry
117 }
118
119 pub const fn text_data(&self) -> &str {
121 self.data.as_str()
122 }
123
124 #[cfg(feature = "json")]
132 pub fn data<T>(&self) -> Result<T, serde_json::Error>
133 where
134 T: for<'de> serde::Deserialize<'de>,
135 {
136 serde_json::from_str(self.text_data())
137 }
138
139 pub fn with_id<T: Into<String>>(mut self, id: T) -> Self {
149 self.id = Some(id.into());
150 self
151 }
152
153 pub fn with_event<T: Into<String>>(mut self, event: T) -> Self {
163 self.event = Some(event.into());
164 self
165 }
166
167 pub fn with_retry(mut self, retry: u64) -> Self {
177 self.retry = Some(retry);
178 self
179 }
180
181 pub fn encode(&self) -> String {
190 let mut encoded = String::new();
191 if let Some(event) = self.event() {
192 encoded.push_str("event: ");
193 encoded.push_str(event);
194 encoded.push('\n');
195 }
196 encoded.push_str("data: ");
197 encoded.push_str(&self.data);
198 encoded.push('\n');
199
200 if let Some(id) = self.id() {
201 encoded.push_str("id: ");
202 encoded.push_str(id);
203 encoded.push('\n');
204 }
205 if let Some(retry) = self.retry() {
206 encoded.push_str("retry: ");
207 encoded.push_str(&retry.to_string());
208 encoded.push('\n');
209 }
210
211 encoded.push('\n');
212 encoded
213 }
214}
215
216pub(crate) fn into_body<S, E>(stream: S) -> impl http_body::Body<Data = Bytes, Error = E> + Send
217where
218 S: Stream<Item = Result<Event, E>> + Send,
219 E: Send,
220{
221 StreamBody::new(
222 stream.map(|result| {
223 result.map(|event| Frame::data(Bytes::from(event.encode().into_bytes())))
224 }),
225 )
226}
227
228pin_project! {
229 pub struct SseStream{
234 #[pin]
235 body:Body,
236 buffer: Vec<u8>,
237 partial_event: PartialEvent,
238 }
239}
240
241#[derive(Default, Debug)]
242struct PartialEvent {
243 id: Option<String>,
244 event: Option<String>,
245 data: Vec<String>,
246 retry: Option<u64>,
247}
248
249impl SseStream {
250 pub fn new(body: Body) -> Self {
255 Self {
256 body,
257 buffer: Vec::new(),
258 partial_event: PartialEvent::default(),
259 }
260 }
261}
262
263#[derive(Debug, Clone)]
265pub enum ParseError {
266 BodyError(String),
268 InvalidUtf8,
270 InvalidRetryValue,
272}
273
274impl fmt::Display for ParseError {
275 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276 match self {
277 ParseError::BodyError(e) => write!(f, "Body stream error: {}", e),
278 ParseError::InvalidUtf8 => write!(f, "Invalid UTF-8 in SSE data"),
279 ParseError::InvalidRetryValue => write!(f, "Invalid retry value in SSE event"),
280 }
281 }
282}
283
284impl StdError for ParseError {}
285
286impl Stream for SseStream {
287 type Item = Result<Event, ParseError>;
288
289 fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
290 let mut this = self.project();
291
292 loop {
293 if let Some(event) = parse_event_from_buffer(this.buffer, this.partial_event) {
295 return Poll::Ready(Some(Ok(event)));
296 }
297
298 match this.body.as_mut().poll_next(cx) {
300 Poll::Ready(Some(Ok(frame))) => {
301 this.buffer.extend_from_slice(&frame);
302 }
303 Poll::Ready(Some(Err(e))) => {
304 return Poll::Ready(Some(Err(ParseError::BodyError(e.to_string()))));
305 }
306 Poll::Ready(None) => {
307 if !this.partial_event.data.is_empty() {
309 return Poll::Ready(Some(Ok(finalize_event(this.partial_event))));
310 }
311 return Poll::Ready(None);
312 }
313 Poll::Pending => return Poll::Pending,
314 }
315 }
316 }
317}
318
319fn parse_event_from_buffer(
320 buffer: &mut Vec<u8>,
321 partial_event: &mut PartialEvent,
322) -> Option<Event> {
323 while let Some(line) = read_line(buffer) {
324 if line.is_empty() {
325 if !partial_event.data.is_empty() {
326 return Some(finalize_event(partial_event));
327 }
328 continue;
329 }
330
331 let line = String::from_utf8_lossy(&line);
332
333 if line.starts_with(':') {
334 continue;
335 } else if let Some(data) = line.strip_prefix("data: ") {
336 partial_event.data.push(data.to_string());
337 } else if let Some(data) = line.strip_prefix("data:") {
338 partial_event.data.push(data.to_string());
339 } else if let Some(event_type) = line.strip_prefix("event: ") {
340 partial_event.event = Some(event_type.to_string());
341 } else if let Some(event_type) = line.strip_prefix("event:") {
342 partial_event.event = Some(event_type.to_string());
343 } else if let Some(id) = line.strip_prefix("id: ") {
344 partial_event.id = Some(id.to_string());
345 } else if let Some(id) = line.strip_prefix("id:") {
346 partial_event.id = Some(id.to_string());
347 } else if let Some(retry_str) = line.strip_prefix("retry: ") {
348 if let Ok(retry) = retry_str.parse::<u64>() {
349 partial_event.retry = Some(retry);
350 }
351 } else if let Some(retry_str) = line.strip_prefix("retry:") {
352 if let Ok(retry) = retry_str.parse::<u64>() {
353 partial_event.retry = Some(retry);
354 }
355 }
356 }
357
358 None
359}
360
361fn read_line(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
362 if buffer.is_empty() {
363 return None;
364 }
365
366 let newline_idx = buffer.iter().position(|b| *b == b'\n' || *b == b'\r')?;
367
368 let line = buffer.drain(..newline_idx).collect::<Vec<u8>>();
369 if !buffer.is_empty() {
371 let newline = buffer.remove(0);
372 if newline == b'\r' && !buffer.is_empty() && buffer[0] == b'\n' {
373 buffer.remove(0);
374 }
375 }
376
377 Some(line)
378}
379
380fn finalize_event(partial_event: &mut PartialEvent) -> Event {
381 let event = Event {
382 id: partial_event.id.take(),
383 event: partial_event.event.take(),
384 data: partial_event.data.join("\n"),
385 retry: partial_event.retry.take(),
386 };
387 partial_event.data.clear();
388 event
389}
390
391#[cfg(test)]
392mod tests {
393 use super::*;
394 use alloc::{format, vec};
395 use futures_lite::StreamExt;
396
397 #[tokio::test]
398 async fn test_parse_simple_event() {
399 let data = b"data: Hello World\n\n";
400 let body = Body::from(Bytes::from(&data[..]));
401 let mut stream = SseStream::new(body);
402
403 let event = stream.next().await.unwrap().unwrap();
404 assert_eq!(event.text_data(), "Hello World");
405 assert_eq!(event.event(), None);
406 assert_eq!(event.id(), None);
407 }
408
409 #[tokio::test]
410 async fn test_parse_event_with_type() {
411 let data = b"event: message\ndata: Test message\n\n";
412 let body = Body::from(Bytes::from(&data[..]));
413 let mut stream = SseStream::new(body);
414
415 let event = stream.next().await.unwrap().unwrap();
416 assert_eq!(event.text_data(), "Test message");
417 assert_eq!(event.event(), Some("message"));
418 }
419
420 #[tokio::test]
421 async fn test_parse_event_with_id() {
422 let data = b"id: 123\ndata: Event with ID\n\n";
423 let body = Body::from(Bytes::from(&data[..]));
424 let mut stream = SseStream::new(body);
425
426 let event = stream.next().await.unwrap().unwrap();
427 assert_eq!(event.text_data(), "Event with ID");
428 assert_eq!(event.id(), Some("123"));
429 }
430
431 #[tokio::test]
432 async fn test_parse_event_with_retry() {
433 let data = b"retry: 5000\ndata: Event with retry\n\n";
434 let body = Body::from(Bytes::from(&data[..]));
435 let mut stream = SseStream::new(body);
436
437 let event = stream.next().await.unwrap().unwrap();
438 assert_eq!(event.text_data(), "Event with retry");
439 assert_eq!(event.retry(), Some(5000));
440 }
441
442 #[tokio::test]
443 async fn test_parse_multiline_data() {
444 let data = b"data: Line 1\ndata: Line 2\ndata: Line 3\n\n";
445 let body = Body::from(Bytes::from(&data[..]));
446 let mut stream = SseStream::new(body);
447
448 let event = stream.next().await.unwrap().unwrap();
449 assert_eq!(event.text_data(), "Line 1\nLine 2\nLine 3");
450 }
451
452 #[tokio::test]
453 async fn test_parse_multiple_events() {
454 let data = b"data: First event\n\ndata: Second event\n\n";
455 let body = Body::from(Bytes::from(&data[..]));
456 let mut stream = SseStream::new(body);
457
458 let event1 = stream.next().await.unwrap().unwrap();
459 assert_eq!(event1.text_data(), "First event");
460
461 let event2 = stream.next().await.unwrap().unwrap();
462 assert_eq!(event2.text_data(), "Second event");
463 }
464
465 #[tokio::test]
466 async fn test_parse_event_with_all_fields() {
467 let data = b"id: abc-123\nevent: update\nretry: 3000\ndata: Complete event\n\n";
468 let body = Body::from(Bytes::from(&data[..]));
469 let mut stream = SseStream::new(body);
470
471 let event = stream.next().await.unwrap().unwrap();
472 assert_eq!(event.text_data(), "Complete event");
473 assert_eq!(event.event(), Some("update"));
474 assert_eq!(event.id(), Some("abc-123"));
475 assert_eq!(event.retry(), Some(3000));
476 }
477
478 #[tokio::test]
479 async fn test_ignore_comments() {
480 let data = b": This is a comment\ndata: Actual data\n: Another comment\n\n";
481 let body = Body::from(Bytes::from(&data[..]));
482 let mut stream = SseStream::new(body);
483
484 let event = stream.next().await.unwrap().unwrap();
485 assert_eq!(event.text_data(), "Actual data");
486 }
487
488 #[tokio::test]
489 async fn test_event_encoding() {
490 let event = Event::from_data("Test message")
491 .with_id("123")
492 .with_event("message");
493
494 let encoded = event.encode();
495 assert!(encoded.contains("event: message\n"));
496 assert!(encoded.contains("data: Test message\n"));
497 assert!(encoded.contains("id: 123\n"));
498 assert!(encoded.ends_with("\n\n"));
499 }
500
501 #[cfg(feature = "json")]
502 #[tokio::test]
503 async fn test_json_serialization() {
504 #[derive(Serialize, serde::Deserialize, PartialEq, Debug)]
505 struct TestData {
506 message: String,
507 count: u32,
508 }
509
510 let data = TestData {
511 message: "Hello".to_string(),
512 count: 42,
513 };
514
515 let event = Event::new(&data).unwrap();
516 assert!(event.text_data().contains("\"message\":\"Hello\""));
517 assert!(event.text_data().contains("\"count\":42"));
518
519 let decoded: TestData = event.data().unwrap();
521 assert_eq!(decoded, data);
522 }
523
524 #[tokio::test]
525 async fn test_stream_chunked_data() {
526 let data = vec![
528 Bytes::from("data: Part"),
529 Bytes::from("ial message\n"),
530 Bytes::from("\ndata: Second"),
531 Bytes::from(" event\n\n"),
532 ];
533
534 let mut combined = Vec::new();
535 for chunk in data {
536 combined.extend_from_slice(&chunk);
537 }
538 let body = Body::from(Bytes::from(combined));
539 let mut sse_stream = SseStream::new(body);
540
541 let event1 = sse_stream.next().await.unwrap().unwrap();
542 assert_eq!(event1.text_data(), "Partial message");
543
544 let event2 = sse_stream.next().await.unwrap().unwrap();
545 assert_eq!(event2.text_data(), "Second event");
546 }
547
548 #[tokio::test]
549 async fn test_empty_data_field() {
550 let data = b"event: ping\ndata: \n\n";
551 let body = Body::from(Bytes::from(&data[..]));
552 let mut stream = SseStream::new(body);
553
554 let event = stream.next().await.unwrap().unwrap();
555 assert_eq!(event.text_data(), "");
556 assert_eq!(event.event(), Some("ping"));
557 }
558
559 #[test]
562 fn test_event_with_retry_method() {
563 let event = Event::from_data("test").with_retry(1000);
564 assert_eq!(event.retry(), Some(1000));
565
566 let encoded = event.encode();
567 assert!(encoded.contains("retry: 1000\n"));
568 }
569
570 #[test]
571 fn test_event_builder_chain() {
572 let event = Event::from_data("Hello")
573 .with_id("msg-1")
574 .with_event("greeting")
575 .with_retry(2000);
576
577 assert_eq!(event.text_data(), "Hello");
578 assert_eq!(event.id(), Some("msg-1"));
579 assert_eq!(event.event(), Some("greeting"));
580 assert_eq!(event.retry(), Some(2000));
581 }
582
583 #[tokio::test]
584 async fn test_parse_event_with_colon_in_data() {
585 let data = b"data: url: https://example.com\n\n";
586 let body = Body::from(Bytes::from(&data[..]));
587 let mut stream = SseStream::new(body);
588
589 let event = stream.next().await.unwrap().unwrap();
590 assert_eq!(event.text_data(), "url: https://example.com");
591 }
592
593 #[tokio::test]
594 async fn test_parse_event_with_empty_lines_between_fields() {
595 let data = b"id: 1\n\ndata: test\n\n";
596 let body = Body::from(Bytes::from(&data[..]));
597 let mut stream = SseStream::new(body);
598
599 let event = stream.next().await.unwrap().unwrap();
600 assert_eq!(event.text_data(), "test");
601 assert_eq!(event.id(), Some("1"));
602 }
603
604 #[tokio::test]
605 async fn test_parse_invalid_retry_value() {
606 let data = b"retry: not_a_number\ndata: test\n\n";
607 let body = Body::from(Bytes::from(&data[..]));
608 let mut stream = SseStream::new(body);
609
610 let event = stream.next().await.unwrap().unwrap();
611 assert_eq!(event.text_data(), "test");
612 assert_eq!(event.retry(), None); }
614
615 #[tokio::test]
616 async fn test_parse_event_with_bom() {
617 let data = b"\xEF\xBB\xBFdata: BOM test\n\n";
619 let body = Body::from(Bytes::from(&data[..]));
620 let _unused_stream = SseStream::new(body);
621
622 let data = b"\xEF\xBB\xBFdata: BOM test\n\ndata: real event\n\n";
625 let body = Body::from(Bytes::from(&data[..]));
626 let mut stream = SseStream::new(body);
627
628 let event = stream.next().await.unwrap().unwrap();
629 assert_eq!(event.text_data(), "real event");
630 }
631
632 #[tokio::test]
633 async fn test_parse_event_with_windows_line_endings() {
634 let data = b"data: Windows\r\ndata: line endings\r\n\r\n";
635 let body = Body::from(Bytes::from(&data[..]));
636 let mut stream = SseStream::new(body);
637
638 let event = stream.next().await.unwrap().unwrap();
639 assert!(event.text_data().contains("Windows"));
641 }
642
643 #[tokio::test]
644 async fn test_parse_event_with_only_comments() {
645 let data = b": comment 1\n: comment 2\n\ndata: real event\n\n";
646 let body = Body::from(Bytes::from(&data[..]));
647 let mut stream = SseStream::new(body);
648
649 let event = stream.next().await.unwrap().unwrap();
651 assert_eq!(event.text_data(), "real event");
652 }
653
654 #[tokio::test]
655 async fn test_parse_event_field_without_space() {
656 let data = b"data:no space\n\n";
657 let body = Body::from(Bytes::from(&data[..]));
658 let mut stream = SseStream::new(body);
659
660 let event = stream.next().await.unwrap().unwrap();
661 assert_eq!(event.text_data(), "no space");
662 }
663
664 #[tokio::test]
665 async fn test_parse_unknown_field() {
666 let data = b"unknown: field\ndata: test\n\n";
667 let body = Body::from(Bytes::from(&data[..]));
668 let mut stream = SseStream::new(body);
669
670 let event = stream.next().await.unwrap().unwrap();
671 assert_eq!(event.text_data(), "test");
672 }
674
675 #[tokio::test]
676 async fn test_parse_very_long_event() {
677 let long_data = "x".repeat(10000);
678 let data = format!("data: {}\n\n", long_data);
679 let body = Body::from(Bytes::from(data.into_bytes()));
680 let mut stream = SseStream::new(body);
681
682 let event = stream.next().await.unwrap().unwrap();
683 assert_eq!(event.text_data(), long_data);
684 }
685
686 #[tokio::test]
687 async fn test_stream_ends_with_partial_event() {
688 let data = b"data: partial";
689 let body = Body::from(Bytes::from(&data[..]));
690 let mut stream = SseStream::new(body);
691
692 let result = stream.next().await;
695 if let Some(Ok(event)) = result {
696 assert_eq!(event.text_data(), "partial");
697 } else {
698 assert!(result.is_none());
700 }
701 }
702
703 #[tokio::test]
704 async fn test_empty_stream() {
705 let data = b"";
706 let body = Body::from(Bytes::from(&data[..]));
707 let mut stream = SseStream::new(body);
708
709 let result = stream.next().await;
710 assert!(result.is_none());
711 }
712
713 #[test]
714 fn test_parse_error_display() {
715 let error = ParseError::InvalidUtf8;
716 assert_eq!(format!("{}", error), "Invalid UTF-8 in SSE data");
717
718 let error = ParseError::InvalidRetryValue;
719 assert_eq!(format!("{}", error), "Invalid retry value in SSE event");
720
721 let error = ParseError::BodyError("test error".to_string());
722 assert_eq!(format!("{}", error), "Body stream error: test error");
723 }
724
725 #[cfg(feature = "json")]
726 #[test]
727 fn test_event_new_with_complex_type() {
728 use serde_json::json;
729
730 let data = json!({
731 "type": "message",
732 "content": "Hello, SSE!"
733 });
734
735 let event = Event::new(&data).unwrap();
736 assert!(event.text_data().contains("\"type\":\"message\""));
737 assert!(event.text_data().contains("\"content\":\"Hello, SSE!\""));
738 }
739
740 #[tokio::test]
741 async fn test_consecutive_newlines() {
742 let data = b"data: test1\n\n\n\ndata: test2\n\n";
743 let body = Body::from(Bytes::from(&data[..]));
744 let mut stream = SseStream::new(body);
745
746 let event1 = stream.next().await.unwrap().unwrap();
747 assert_eq!(event1.text_data(), "test1");
748
749 let event2 = stream.next().await.unwrap().unwrap();
750 assert_eq!(event2.text_data(), "test2");
751 }
752
753 #[tokio::test]
754 async fn test_data_with_special_characters() {
755 let data = b"data: {\"emoji\": \"\xF0\x9F\x98\x80\"}\n\n";
756 let body = Body::from(Bytes::from(&data[..]));
757 let mut stream = SseStream::new(body);
758
759 let event = stream.next().await.unwrap().unwrap();
760 assert!(event.text_data().contains("😀"));
761 }
762
763 #[tokio::test]
764 async fn test_parse_interleaved_fields() {
765 let data = b"data: part1\nid: 123\ndata: part2\nevent: test\ndata: part3\n\n";
766 let body = Body::from(Bytes::from(&data[..]));
767 let mut stream = SseStream::new(body);
768
769 let event = stream.next().await.unwrap().unwrap();
770 assert_eq!(event.text_data(), "part1\npart2\npart3");
771 assert_eq!(event.id(), Some("123"));
772 assert_eq!(event.event(), Some("test"));
773 }
774
775 #[test]
776 fn test_parse_error_clone() {
777 let error = ParseError::InvalidUtf8;
778 let cloned = error.clone();
779 assert_eq!(format!("{}", cloned), "Invalid UTF-8 in SSE data");
780 }
781
782 #[test]
783 fn test_event_debug() {
784 let event = Event::from_data("test");
785 let debug_str = format!("{:?}", event);
786 assert!(debug_str.contains("Event"));
787 assert!(debug_str.contains("data"));
788 }
789
790 #[tokio::test]
791 async fn test_multiple_empty_data_fields() {
792 let data = b"data: \ndata: \ndata: \n\n";
793 let body = Body::from(Bytes::from(&data[..]));
794 let mut stream = SseStream::new(body);
795
796 let event = stream.next().await.unwrap().unwrap();
797 assert_eq!(event.text_data(), "\n\n");
798 }
799
800 #[tokio::test]
801 async fn test_parse_with_leading_whitespace() {
802 let data = b" data: test\n\ndata: real event\n\n";
803 let body = Body::from(Bytes::from(&data[..]));
804 let mut stream = SseStream::new(body);
805
806 let event = stream.next().await.unwrap().unwrap();
808 assert_eq!(event.text_data(), "real event");
809 }
810}