http_kit/
sse.rs

1//! Server-Sent Events (SSE) implementation module.
2//!
3//! This module provides functionality for handling Server-Sent Events, a web standard
4//! that allows a server to push data to a web page in real-time. SSE is useful for
5//! applications that need to stream live data to clients, such as chat applications,
6//! live feeds, or real-time notifications.
7//!
8//! The module includes utilities for formatting SSE messages, managing event streams,
9//! and handling the SSE protocol according to the W3C specification.
10//!
11//! # Examples
12//!
13//! ```rust
14//! // Basic SSE event creation and formatting
15//! use http_kit::sse::Event;
16//!
17//! let event = Event::from_data("Hello, World!").with_id("my-id");
18//! ```
19
20use alloc::string::{String, ToString};
21use alloc::vec::Vec;
22use bytes::Bytes;
23use core::error::Error as StdError;
24use core::fmt;
25use core::pin::Pin;
26use core::task::{Context, Poll};
27use futures_lite::{Stream, StreamExt};
28use http_body::Frame;
29use http_body_util::StreamBody;
30use pin_project_lite::pin_project;
31#[cfg(feature = "json")]
32use serde::Serialize;
33#[cfg(feature = "json")]
34use serde_json::{self, to_string};
35
36use crate::Body;
37
38/// Represents a Server-Sent Event that can be sent to clients.
39#[derive(Debug)]
40pub struct Event {
41    event: Option<String>,
42    data: String,
43    id: Option<String>,
44    retry: Option<u64>,
45}
46
47impl Event {
48    /// Creates a new SSE event from JSON-serializable data.
49    ///
50    /// This helper is available when the `json` feature is enabled and returns an
51    /// error instead of panicking if serialization fails.
52    ///
53    /// # Examples
54    ///
55    /// ```rust
56    /// # #[cfg(feature = "json")]
57    /// # {
58    /// use http_kit::sse::Event;
59    /// use serde::Serialize;
60    ///
61    /// #[derive(Serialize)]
62    /// struct Message { text: String }
63    ///
64    /// # fn demo() -> Result<(), serde_json::Error> {
65    /// let msg = Message { text: "Hello".to_string() };
66    /// let event = Event::new(&msg)?;
67    /// # let _ = event;
68    /// # Ok(())
69    /// # }
70    /// # demo().unwrap();
71    /// # }
72    /// ```
73    #[cfg(feature = "json")]
74    pub fn new<T: Serialize>(data: &T) -> Result<Self, serde_json::Error> {
75        Ok(Self::from_data(to_string(data)?))
76    }
77
78    /// Creates a new SSE event from string data.
79    ///
80    /// # Examples
81    ///
82    /// ```rust
83    /// use http_kit::sse::Event;
84    ///
85    /// let event = Event::from_data("Hello, World!");
86    /// ```
87    pub fn from_data<T: Into<String>>(data: T) -> Self {
88        Self {
89            event: None,
90            data: data.into(),
91            id: None,
92            retry: None,
93        }
94    }
95
96    /// Returns the event ID if set.
97    pub const fn id(&self) -> Option<&str> {
98        if let Some(id) = self.id.as_ref() {
99            Some(id.as_str())
100        } else {
101            None
102        }
103    }
104
105    /// Returns the event type if set.
106    pub const fn event(&self) -> Option<&str> {
107        if let Some(event) = self.event.as_ref() {
108            Some(event.as_str())
109        } else {
110            None
111        }
112    }
113
114    /// Returns the retry duration in milliseconds if set.
115    pub const fn retry(&self) -> Option<u64> {
116        self.retry
117    }
118
119    /// Returns the raw text data of the event.
120    pub const fn text_data(&self) -> &str {
121        self.data.as_str()
122    }
123
124    /// Deserializes the event data as JSON.
125    ///
126    /// This helper is available when the `json` feature is enabled.
127    ///
128    /// # Errors
129    ///
130    /// Returns an error if the data cannot be deserialized as the specified type.
131    #[cfg(feature = "json")]
132    pub fn data<T>(&self) -> Result<T, serde_json::Error>
133    where
134        T: for<'de> serde::Deserialize<'de>,
135    {
136        serde_json::from_str(self.text_data())
137    }
138
139    /// Sets the event ID.
140    ///
141    /// # Examples
142    ///
143    /// ```rust
144    /// use http_kit::sse::Event;
145    ///
146    /// let event = Event::from_data("Hello").with_id("msg-123");
147    /// ```
148    pub fn with_id<T: Into<String>>(mut self, id: T) -> Self {
149        self.id = Some(id.into());
150        self
151    }
152
153    /// Sets the event type.
154    ///
155    /// # Examples
156    ///
157    /// ```rust
158    /// use http_kit::sse::Event;
159    ///
160    /// let event = Event::from_data("Hello").with_event("message");
161    /// ```
162    pub fn with_event<T: Into<String>>(mut self, event: T) -> Self {
163        self.event = Some(event.into());
164        self
165    }
166
167    /// Sets the retry duration in milliseconds.
168    ///
169    /// # Examples
170    ///
171    /// ```rust
172    /// use http_kit::sse::Event;
173    ///
174    /// let event = Event::from_data("Hello").with_retry(5000);
175    /// ```
176    pub fn with_retry(mut self, retry: u64) -> Self {
177        self.retry = Some(retry);
178        self
179    }
180
181    /// Encodes the event as an SSE-formatted string.
182    ///
183    /// The output follows the SSE specification format:
184    /// - `event: <type>` (optional)
185    /// - `data: <data>`
186    /// - `id: <id>` (optional)
187    /// - `retry: <milliseconds>` (optional)
188    /// - Empty line to end the event
189    pub fn encode(&self) -> String {
190        let mut encoded = String::new();
191        if let Some(event) = self.event() {
192            encoded.push_str("event: ");
193            encoded.push_str(event);
194            encoded.push('\n');
195        }
196        encoded.push_str("data: ");
197        encoded.push_str(&self.data);
198        encoded.push('\n');
199
200        if let Some(id) = self.id() {
201            encoded.push_str("id: ");
202            encoded.push_str(id);
203            encoded.push('\n');
204        }
205        if let Some(retry) = self.retry() {
206            encoded.push_str("retry: ");
207            encoded.push_str(&retry.to_string());
208            encoded.push('\n');
209        }
210
211        encoded.push('\n');
212        encoded
213    }
214}
215
216pub(crate) fn into_body<S, E>(stream: S) -> impl http_body::Body<Data = Bytes, Error = E> + Send
217where
218    S: Stream<Item = Result<Event, E>> + Send,
219    E: Send,
220{
221    StreamBody::new(
222        stream.map(|result| {
223            result.map(|event| Frame::data(Bytes::from(event.encode().into_bytes())))
224        }),
225    )
226}
227
228pin_project! {
229    /// A stream wrapper for Server-Sent Events over an HTTP body.
230    ///
231    /// This struct provides a way to parse an incoming HTTP body as a stream of
232    /// Server-Sent Events, allowing you to process SSE data asynchronously.
233    pub struct SseStream{
234        #[pin]
235        body:Body,
236        buffer: Vec<u8>,
237        partial_event: PartialEvent,
238    }
239}
240
241#[derive(Default, Debug)]
242struct PartialEvent {
243    id: Option<String>,
244    event: Option<String>,
245    data: Vec<String>,
246    retry: Option<u64>,
247}
248
249impl SseStream {
250    /// Creates a new SSE stream from an HTTP body.
251    ///
252    /// This function wraps the provided body in an SSE stream parser that can
253    /// asynchronously parse Server-Sent Events from the body data.
254    pub fn new(body: Body) -> Self {
255        Self {
256            body,
257            buffer: Vec::new(),
258            partial_event: PartialEvent::default(),
259        }
260    }
261}
262
263/// Errors that can occur while parsing Server-Sent Events.
264#[derive(Debug, Clone)]
265pub enum ParseError {
266    /// The underlying body stream encountered an error
267    BodyError(String),
268    /// Invalid UTF-8 encoding in the SSE data
269    InvalidUtf8,
270    /// Invalid retry value (not a valid number)
271    InvalidRetryValue,
272}
273
274impl fmt::Display for ParseError {
275    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
276        match self {
277            ParseError::BodyError(e) => write!(f, "Body stream error: {}", e),
278            ParseError::InvalidUtf8 => write!(f, "Invalid UTF-8 in SSE data"),
279            ParseError::InvalidRetryValue => write!(f, "Invalid retry value in SSE event"),
280        }
281    }
282}
283
284impl StdError for ParseError {}
285
286impl Stream for SseStream {
287    type Item = Result<Event, ParseError>;
288
289    fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
290        let mut this = self.project();
291
292        loop {
293            // Try to parse an event from the buffer
294            if let Some(event) = parse_event_from_buffer(this.buffer, this.partial_event) {
295                return Poll::Ready(Some(Ok(event)));
296            }
297
298            // If no complete event, read more data from the body
299            match this.body.as_mut().poll_next(cx) {
300                Poll::Ready(Some(Ok(frame))) => {
301                    this.buffer.extend_from_slice(&frame);
302                }
303                Poll::Ready(Some(Err(e))) => {
304                    return Poll::Ready(Some(Err(ParseError::BodyError(e.to_string()))));
305                }
306                Poll::Ready(None) => {
307                    // Stream ended, check if we have a partial event to emit
308                    if !this.partial_event.data.is_empty() {
309                        return Poll::Ready(Some(Ok(finalize_event(this.partial_event))));
310                    }
311                    return Poll::Ready(None);
312                }
313                Poll::Pending => return Poll::Pending,
314            }
315        }
316    }
317}
318
319fn parse_event_from_buffer(
320    buffer: &mut Vec<u8>,
321    partial_event: &mut PartialEvent,
322) -> Option<Event> {
323    while let Some(line) = read_line(buffer) {
324        if line.is_empty() {
325            if !partial_event.data.is_empty() {
326                return Some(finalize_event(partial_event));
327            }
328            continue;
329        }
330
331        let line = String::from_utf8_lossy(&line);
332
333        if line.starts_with(':') {
334            continue;
335        } else if let Some(data) = line.strip_prefix("data: ") {
336            partial_event.data.push(data.to_string());
337        } else if let Some(data) = line.strip_prefix("data:") {
338            partial_event.data.push(data.to_string());
339        } else if let Some(event_type) = line.strip_prefix("event: ") {
340            partial_event.event = Some(event_type.to_string());
341        } else if let Some(event_type) = line.strip_prefix("event:") {
342            partial_event.event = Some(event_type.to_string());
343        } else if let Some(id) = line.strip_prefix("id: ") {
344            partial_event.id = Some(id.to_string());
345        } else if let Some(id) = line.strip_prefix("id:") {
346            partial_event.id = Some(id.to_string());
347        } else if let Some(retry_str) = line.strip_prefix("retry: ") {
348            if let Ok(retry) = retry_str.parse::<u64>() {
349                partial_event.retry = Some(retry);
350            }
351        } else if let Some(retry_str) = line.strip_prefix("retry:") {
352            if let Ok(retry) = retry_str.parse::<u64>() {
353                partial_event.retry = Some(retry);
354            }
355        }
356    }
357
358    None
359}
360
361fn read_line(buffer: &mut Vec<u8>) -> Option<Vec<u8>> {
362    if buffer.is_empty() {
363        return None;
364    }
365
366    let newline_idx = buffer.iter().position(|b| *b == b'\n' || *b == b'\r')?;
367
368    let line = buffer.drain(..newline_idx).collect::<Vec<u8>>();
369    // Remove the newline character we stopped at.
370    if !buffer.is_empty() {
371        let newline = buffer.remove(0);
372        if newline == b'\r' && !buffer.is_empty() && buffer[0] == b'\n' {
373            buffer.remove(0);
374        }
375    }
376
377    Some(line)
378}
379
380fn finalize_event(partial_event: &mut PartialEvent) -> Event {
381    let event = Event {
382        id: partial_event.id.take(),
383        event: partial_event.event.take(),
384        data: partial_event.data.join("\n"),
385        retry: partial_event.retry.take(),
386    };
387    partial_event.data.clear();
388    event
389}
390
391#[cfg(test)]
392mod tests {
393    use super::*;
394    use alloc::{format, vec};
395    use futures_lite::StreamExt;
396
397    #[tokio::test]
398    async fn test_parse_simple_event() {
399        let data = b"data: Hello World\n\n";
400        let body = Body::from(Bytes::from(&data[..]));
401        let mut stream = SseStream::new(body);
402
403        let event = stream.next().await.unwrap().unwrap();
404        assert_eq!(event.text_data(), "Hello World");
405        assert_eq!(event.event(), None);
406        assert_eq!(event.id(), None);
407    }
408
409    #[tokio::test]
410    async fn test_parse_event_with_type() {
411        let data = b"event: message\ndata: Test message\n\n";
412        let body = Body::from(Bytes::from(&data[..]));
413        let mut stream = SseStream::new(body);
414
415        let event = stream.next().await.unwrap().unwrap();
416        assert_eq!(event.text_data(), "Test message");
417        assert_eq!(event.event(), Some("message"));
418    }
419
420    #[tokio::test]
421    async fn test_parse_event_with_id() {
422        let data = b"id: 123\ndata: Event with ID\n\n";
423        let body = Body::from(Bytes::from(&data[..]));
424        let mut stream = SseStream::new(body);
425
426        let event = stream.next().await.unwrap().unwrap();
427        assert_eq!(event.text_data(), "Event with ID");
428        assert_eq!(event.id(), Some("123"));
429    }
430
431    #[tokio::test]
432    async fn test_parse_event_with_retry() {
433        let data = b"retry: 5000\ndata: Event with retry\n\n";
434        let body = Body::from(Bytes::from(&data[..]));
435        let mut stream = SseStream::new(body);
436
437        let event = stream.next().await.unwrap().unwrap();
438        assert_eq!(event.text_data(), "Event with retry");
439        assert_eq!(event.retry(), Some(5000));
440    }
441
442    #[tokio::test]
443    async fn test_parse_multiline_data() {
444        let data = b"data: Line 1\ndata: Line 2\ndata: Line 3\n\n";
445        let body = Body::from(Bytes::from(&data[..]));
446        let mut stream = SseStream::new(body);
447
448        let event = stream.next().await.unwrap().unwrap();
449        assert_eq!(event.text_data(), "Line 1\nLine 2\nLine 3");
450    }
451
452    #[tokio::test]
453    async fn test_parse_multiple_events() {
454        let data = b"data: First event\n\ndata: Second event\n\n";
455        let body = Body::from(Bytes::from(&data[..]));
456        let mut stream = SseStream::new(body);
457
458        let event1 = stream.next().await.unwrap().unwrap();
459        assert_eq!(event1.text_data(), "First event");
460
461        let event2 = stream.next().await.unwrap().unwrap();
462        assert_eq!(event2.text_data(), "Second event");
463    }
464
465    #[tokio::test]
466    async fn test_parse_event_with_all_fields() {
467        let data = b"id: abc-123\nevent: update\nretry: 3000\ndata: Complete event\n\n";
468        let body = Body::from(Bytes::from(&data[..]));
469        let mut stream = SseStream::new(body);
470
471        let event = stream.next().await.unwrap().unwrap();
472        assert_eq!(event.text_data(), "Complete event");
473        assert_eq!(event.event(), Some("update"));
474        assert_eq!(event.id(), Some("abc-123"));
475        assert_eq!(event.retry(), Some(3000));
476    }
477
478    #[tokio::test]
479    async fn test_ignore_comments() {
480        let data = b": This is a comment\ndata: Actual data\n: Another comment\n\n";
481        let body = Body::from(Bytes::from(&data[..]));
482        let mut stream = SseStream::new(body);
483
484        let event = stream.next().await.unwrap().unwrap();
485        assert_eq!(event.text_data(), "Actual data");
486    }
487
488    #[tokio::test]
489    async fn test_event_encoding() {
490        let event = Event::from_data("Test message")
491            .with_id("123")
492            .with_event("message");
493
494        let encoded = event.encode();
495        assert!(encoded.contains("event: message\n"));
496        assert!(encoded.contains("data: Test message\n"));
497        assert!(encoded.contains("id: 123\n"));
498        assert!(encoded.ends_with("\n\n"));
499    }
500
501    #[cfg(feature = "json")]
502    #[tokio::test]
503    async fn test_json_serialization() {
504        #[derive(Serialize, serde::Deserialize, PartialEq, Debug)]
505        struct TestData {
506            message: String,
507            count: u32,
508        }
509
510        let data = TestData {
511            message: "Hello".to_string(),
512            count: 42,
513        };
514
515        let event = Event::new(&data).unwrap();
516        assert!(event.text_data().contains("\"message\":\"Hello\""));
517        assert!(event.text_data().contains("\"count\":42"));
518
519        // Test deserialization
520        let decoded: TestData = event.data().unwrap();
521        assert_eq!(decoded, data);
522    }
523
524    #[tokio::test]
525    async fn test_stream_chunked_data() {
526        // Simulate data coming in chunks
527        let data = vec![
528            Bytes::from("data: Part"),
529            Bytes::from("ial message\n"),
530            Bytes::from("\ndata: Second"),
531            Bytes::from(" event\n\n"),
532        ];
533
534        let mut combined = Vec::new();
535        for chunk in data {
536            combined.extend_from_slice(&chunk);
537        }
538        let body = Body::from(Bytes::from(combined));
539        let mut sse_stream = SseStream::new(body);
540
541        let event1 = sse_stream.next().await.unwrap().unwrap();
542        assert_eq!(event1.text_data(), "Partial message");
543
544        let event2 = sse_stream.next().await.unwrap().unwrap();
545        assert_eq!(event2.text_data(), "Second event");
546    }
547
548    #[tokio::test]
549    async fn test_empty_data_field() {
550        let data = b"event: ping\ndata: \n\n";
551        let body = Body::from(Bytes::from(&data[..]));
552        let mut stream = SseStream::new(body);
553
554        let event = stream.next().await.unwrap().unwrap();
555        assert_eq!(event.text_data(), "");
556        assert_eq!(event.event(), Some("ping"));
557    }
558
559    // Additional comprehensive unit tests
560
561    #[test]
562    fn test_event_with_retry_method() {
563        let event = Event::from_data("test").with_retry(1000);
564        assert_eq!(event.retry(), Some(1000));
565
566        let encoded = event.encode();
567        assert!(encoded.contains("retry: 1000\n"));
568    }
569
570    #[test]
571    fn test_event_builder_chain() {
572        let event = Event::from_data("Hello")
573            .with_id("msg-1")
574            .with_event("greeting")
575            .with_retry(2000);
576
577        assert_eq!(event.text_data(), "Hello");
578        assert_eq!(event.id(), Some("msg-1"));
579        assert_eq!(event.event(), Some("greeting"));
580        assert_eq!(event.retry(), Some(2000));
581    }
582
583    #[tokio::test]
584    async fn test_parse_event_with_colon_in_data() {
585        let data = b"data: url: https://example.com\n\n";
586        let body = Body::from(Bytes::from(&data[..]));
587        let mut stream = SseStream::new(body);
588
589        let event = stream.next().await.unwrap().unwrap();
590        assert_eq!(event.text_data(), "url: https://example.com");
591    }
592
593    #[tokio::test]
594    async fn test_parse_event_with_empty_lines_between_fields() {
595        let data = b"id: 1\n\ndata: test\n\n";
596        let body = Body::from(Bytes::from(&data[..]));
597        let mut stream = SseStream::new(body);
598
599        let event = stream.next().await.unwrap().unwrap();
600        assert_eq!(event.text_data(), "test");
601        assert_eq!(event.id(), Some("1"));
602    }
603
604    #[tokio::test]
605    async fn test_parse_invalid_retry_value() {
606        let data = b"retry: not_a_number\ndata: test\n\n";
607        let body = Body::from(Bytes::from(&data[..]));
608        let mut stream = SseStream::new(body);
609
610        let event = stream.next().await.unwrap().unwrap();
611        assert_eq!(event.text_data(), "test");
612        assert_eq!(event.retry(), None); // Invalid retry should be ignored
613    }
614
615    #[tokio::test]
616    async fn test_parse_event_with_bom() {
617        // UTF-8 BOM followed by SSE data - BOM is treated as part of the data
618        let data = b"\xEF\xBB\xBFdata: BOM test\n\n";
619        let body = Body::from(Bytes::from(&data[..]));
620        let _unused_stream = SseStream::new(body);
621
622        // BOM is part of the invalid line, so this event won't be parsed
623        // Let's add a proper event after
624        let data = b"\xEF\xBB\xBFdata: BOM test\n\ndata: real event\n\n";
625        let body = Body::from(Bytes::from(&data[..]));
626        let mut stream = SseStream::new(body);
627
628        let event = stream.next().await.unwrap().unwrap();
629        assert_eq!(event.text_data(), "real event");
630    }
631
632    #[tokio::test]
633    async fn test_parse_event_with_windows_line_endings() {
634        let data = b"data: Windows\r\ndata: line endings\r\n\r\n";
635        let body = Body::from(Bytes::from(&data[..]));
636        let mut stream = SseStream::new(body);
637
638        let event = stream.next().await.unwrap().unwrap();
639        // Should handle \r\n properly
640        assert!(event.text_data().contains("Windows"));
641    }
642
643    #[tokio::test]
644    async fn test_parse_event_with_only_comments() {
645        let data = b": comment 1\n: comment 2\n\ndata: real event\n\n";
646        let body = Body::from(Bytes::from(&data[..]));
647        let mut stream = SseStream::new(body);
648
649        // Should skip comment-only block and get the real event
650        let event = stream.next().await.unwrap().unwrap();
651        assert_eq!(event.text_data(), "real event");
652    }
653
654    #[tokio::test]
655    async fn test_parse_event_field_without_space() {
656        let data = b"data:no space\n\n";
657        let body = Body::from(Bytes::from(&data[..]));
658        let mut stream = SseStream::new(body);
659
660        let event = stream.next().await.unwrap().unwrap();
661        assert_eq!(event.text_data(), "no space");
662    }
663
664    #[tokio::test]
665    async fn test_parse_unknown_field() {
666        let data = b"unknown: field\ndata: test\n\n";
667        let body = Body::from(Bytes::from(&data[..]));
668        let mut stream = SseStream::new(body);
669
670        let event = stream.next().await.unwrap().unwrap();
671        assert_eq!(event.text_data(), "test");
672        // Unknown field should be ignored
673    }
674
675    #[tokio::test]
676    async fn test_parse_very_long_event() {
677        let long_data = "x".repeat(10000);
678        let data = format!("data: {}\n\n", long_data);
679        let body = Body::from(Bytes::from(data.into_bytes()));
680        let mut stream = SseStream::new(body);
681
682        let event = stream.next().await.unwrap().unwrap();
683        assert_eq!(event.text_data(), long_data);
684    }
685
686    #[tokio::test]
687    async fn test_stream_ends_with_partial_event() {
688        let data = b"data: partial";
689        let body = Body::from(Bytes::from(&data[..]));
690        let mut stream = SseStream::new(body);
691
692        // The stream ends without a double newline
693        // Currently the implementation emits partial events when the stream ends
694        let result = stream.next().await;
695        if let Some(Ok(event)) = result {
696            assert_eq!(event.text_data(), "partial");
697        } else {
698            // If the parser doesn't emit partial events, that's also valid behavior
699            assert!(result.is_none());
700        }
701    }
702
703    #[tokio::test]
704    async fn test_empty_stream() {
705        let data = b"";
706        let body = Body::from(Bytes::from(&data[..]));
707        let mut stream = SseStream::new(body);
708
709        let result = stream.next().await;
710        assert!(result.is_none());
711    }
712
713    #[test]
714    fn test_parse_error_display() {
715        let error = ParseError::InvalidUtf8;
716        assert_eq!(format!("{}", error), "Invalid UTF-8 in SSE data");
717
718        let error = ParseError::InvalidRetryValue;
719        assert_eq!(format!("{}", error), "Invalid retry value in SSE event");
720
721        let error = ParseError::BodyError("test error".to_string());
722        assert_eq!(format!("{}", error), "Body stream error: test error");
723    }
724
725    #[cfg(feature = "json")]
726    #[test]
727    fn test_event_new_with_complex_type() {
728        use serde_json::json;
729
730        let data = json!({
731            "type": "message",
732            "content": "Hello, SSE!"
733        });
734
735        let event = Event::new(&data).unwrap();
736        assert!(event.text_data().contains("\"type\":\"message\""));
737        assert!(event.text_data().contains("\"content\":\"Hello, SSE!\""));
738    }
739
740    #[tokio::test]
741    async fn test_consecutive_newlines() {
742        let data = b"data: test1\n\n\n\ndata: test2\n\n";
743        let body = Body::from(Bytes::from(&data[..]));
744        let mut stream = SseStream::new(body);
745
746        let event1 = stream.next().await.unwrap().unwrap();
747        assert_eq!(event1.text_data(), "test1");
748
749        let event2 = stream.next().await.unwrap().unwrap();
750        assert_eq!(event2.text_data(), "test2");
751    }
752
753    #[tokio::test]
754    async fn test_data_with_special_characters() {
755        let data = b"data: {\"emoji\": \"\xF0\x9F\x98\x80\"}\n\n";
756        let body = Body::from(Bytes::from(&data[..]));
757        let mut stream = SseStream::new(body);
758
759        let event = stream.next().await.unwrap().unwrap();
760        assert!(event.text_data().contains("😀"));
761    }
762
763    #[tokio::test]
764    async fn test_parse_interleaved_fields() {
765        let data = b"data: part1\nid: 123\ndata: part2\nevent: test\ndata: part3\n\n";
766        let body = Body::from(Bytes::from(&data[..]));
767        let mut stream = SseStream::new(body);
768
769        let event = stream.next().await.unwrap().unwrap();
770        assert_eq!(event.text_data(), "part1\npart2\npart3");
771        assert_eq!(event.id(), Some("123"));
772        assert_eq!(event.event(), Some("test"));
773    }
774
775    #[test]
776    fn test_parse_error_clone() {
777        let error = ParseError::InvalidUtf8;
778        let cloned = error.clone();
779        assert_eq!(format!("{}", cloned), "Invalid UTF-8 in SSE data");
780    }
781
782    #[test]
783    fn test_event_debug() {
784        let event = Event::from_data("test");
785        let debug_str = format!("{:?}", event);
786        assert!(debug_str.contains("Event"));
787        assert!(debug_str.contains("data"));
788    }
789
790    #[tokio::test]
791    async fn test_multiple_empty_data_fields() {
792        let data = b"data: \ndata: \ndata: \n\n";
793        let body = Body::from(Bytes::from(&data[..]));
794        let mut stream = SseStream::new(body);
795
796        let event = stream.next().await.unwrap().unwrap();
797        assert_eq!(event.text_data(), "\n\n");
798    }
799
800    #[tokio::test]
801    async fn test_parse_with_leading_whitespace() {
802        let data = b"  data: test\n\ndata: real event\n\n";
803        let body = Body::from(Bytes::from(&data[..]));
804        let mut stream = SseStream::new(body);
805
806        // Leading whitespace means it's not a valid field, should get the next valid event
807        let event = stream.next().await.unwrap().unwrap();
808        assert_eq!(event.text_data(), "real event");
809    }
810}