Skip to main content

spikard_http/
sse.rs

1//! Server-Sent Events (SSE) support for Spikard
2//!
3//! Provides SSE streaming with event generation and lifecycle management.
4
5use axum::{
6    extract::State,
7    response::{
8        IntoResponse,
9        sse::{Event, KeepAlive, Sse},
10    },
11};
12use futures_util::stream;
13use std::{convert::Infallible, sync::Arc, time::Duration};
14use tracing::{debug, error, info};
15
16/// SSE event producer trait
17///
18/// Implement this trait to create custom Server-Sent Event (SSE) producers for your application.
19/// The producer generates events that are streamed to connected clients.
20///
21/// # Understanding SSE
22///
23/// Server-Sent Events (SSE) provide one-way communication from server to client over HTTP.
24/// Unlike WebSocket, SSE uses standard HTTP and automatically handles reconnection.
25/// Use SSE when you need to push data to clients without bidirectional communication.
26///
27/// # Implementing the Trait
28///
29/// You must implement the `next_event` method to generate events. The `on_connect` and
30/// `on_disconnect` methods are optional lifecycle hooks.
31///
32/// # Example
33///
34/// ```ignore
35/// use spikard_http::sse::{SseEventProducer, SseEvent};
36/// use serde_json::json;
37/// use std::time::Duration;
38/// use tokio::time::sleep;
39///
40/// struct CounterProducer {
41///     limit: usize,
42/// }
43///
44/// #[async_trait]
45/// impl SseEventProducer for CounterProducer {
46///     async fn next_event(&self) -> Option<SseEvent> {
47///         static COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
48///
49///         let count = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
50///         if count < self.limit {
51///             Some(SseEvent::new(json!({"count": count})))
52///         } else {
53///             None
54///         }
55///     }
56///
57///     async fn on_connect(&self) {
58///         println!("Client connected");
59///     }
60///
61///     async fn on_disconnect(&self) {
62///         println!("Client disconnected");
63///     }
64/// }
65/// ```
66pub trait SseEventProducer: Send + Sync {
67    /// Generate the next event
68    ///
69    /// Called repeatedly to produce the event stream. Should return `Some(event)` when
70    /// an event is ready to send, or `None` when the stream should end.
71    ///
72    /// # Returns
73    /// * `Some(event)` - Event to send to the client
74    /// * `None` - Stream complete, connection will close
75    fn next_event(&self) -> impl std::future::Future<Output = Option<SseEvent>> + Send;
76
77    /// Called when a client connects to the SSE endpoint
78    ///
79    /// Optional lifecycle hook invoked when a new SSE connection is established.
80    /// Default implementation does nothing.
81    fn on_connect(&self) -> impl std::future::Future<Output = ()> + Send {
82        async {}
83    }
84
85    /// Called when a client disconnects from the SSE endpoint
86    ///
87    /// Optional lifecycle hook invoked when an SSE connection is closed (either by the
88    /// client or the stream ending). Default implementation does nothing.
89    fn on_disconnect(&self) -> impl std::future::Future<Output = ()> + Send {
90        async {}
91    }
92}
93
94/// An individual SSE event
95///
96/// Represents a single Server-Sent Event to be sent to a connected client.
97/// Events can have an optional type, ID, and retry timeout for advanced scenarios.
98///
99/// # Fields
100///
101/// * `event_type` - Optional event type string (used for client-side event filtering)
102/// * `data` - JSON data payload to send to the client
103/// * `id` - Optional event ID (clients can use this to resume after disconnect)
104/// * `retry` - Optional retry timeout in milliseconds (tells client when to reconnect)
105///
106/// # SSE Format
107///
108/// Events are serialized to the following text format:
109/// ```text
110/// event: event_type
111/// data: {"json":"value"}
112/// id: event-123
113/// retry: 3000
114/// ```
115#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
116pub struct SseEvent {
117    /// Event type (optional)
118    pub event_type: Option<String>,
119    /// Event data (JSON value)
120    pub data: serde_json::Value,
121    /// Event ID (optional, for client-side reconnection)
122    pub id: Option<String>,
123    /// Retry timeout in milliseconds (optional)
124    pub retry: Option<u64>,
125}
126
127impl SseEvent {
128    /// Create a new SSE event with data only
129    ///
130    /// Creates a minimal event with just the data payload. Use builder methods
131    /// to add optional fields.
132    ///
133    /// # Arguments
134    /// * `data` - JSON value to send to the client
135    ///
136    /// # Example
137    ///
138    /// ```ignore
139    /// use serde_json::json;
140    /// use spikard_http::sse::SseEvent;
141    ///
142    /// let event = SseEvent::new(json!({"status": "connected"}));
143    /// ```
144    pub fn new(data: serde_json::Value) -> Self {
145        Self {
146            event_type: None,
147            data,
148            id: None,
149            retry: None,
150        }
151    }
152
153    /// Create a new SSE event with an event type and data
154    ///
155    /// Creates an event with a type field. Clients can filter events by type
156    /// in their event listener.
157    ///
158    /// # Arguments
159    /// * `event_type` - String identifying the event type (e.g., "update", "error")
160    /// * `data` - JSON value to send to the client
161    ///
162    /// # Example
163    ///
164    /// ```ignore
165    /// use serde_json::json;
166    /// use spikard_http::sse::SseEvent;
167    ///
168    /// let event = SseEvent::with_type("update", json!({"count": 42}));
169    /// // Client can listen with: eventSource.addEventListener("update", ...)
170    /// ```
171    pub fn with_type(event_type: impl Into<String>, data: serde_json::Value) -> Self {
172        Self {
173            event_type: Some(event_type.into()),
174            data,
175            id: None,
176            retry: None,
177        }
178    }
179
180    /// Set the event ID for client-side reconnection support
181    ///
182    /// Sets an ID that clients can use to resume from this point if they disconnect.
183    /// The client sends this ID back in the `Last-Event-ID` header when reconnecting.
184    ///
185    /// # Arguments
186    /// * `id` - Unique identifier for this event
187    ///
188    /// # Example
189    ///
190    /// ```ignore
191    /// use serde_json::json;
192    /// use spikard_http::sse::SseEvent;
193    ///
194    /// let event = SseEvent::new(json!({"count": 1}))
195    ///     .with_id("event-1");
196    /// ```
197    pub fn with_id(mut self, id: impl Into<String>) -> Self {
198        self.id = Some(id.into());
199        self
200    }
201
202    /// Set the retry timeout for client reconnection
203    ///
204    /// Sets the time in milliseconds clients should wait before attempting to reconnect
205    /// if the connection is lost. The client browser will automatically handle reconnection.
206    ///
207    /// # Arguments
208    /// * `retry_ms` - Retry timeout in milliseconds
209    ///
210    /// # Example
211    ///
212    /// ```ignore
213    /// use serde_json::json;
214    /// use spikard_http::sse::SseEvent;
215    ///
216    /// let event = SseEvent::new(json!({"data": "value"}))
217    ///     .with_retry(5000); // Reconnect after 5 seconds
218    /// ```
219    pub fn with_retry(mut self, retry_ms: u64) -> Self {
220        self.retry = Some(retry_ms);
221        self
222    }
223
224    /// Convert to Axum's SSE Event
225    fn into_axum_event(self) -> Event {
226        let json_data = match serde_json::to_string(&self.data) {
227            Ok(json) => json,
228            Err(e) => {
229                error!("Failed to serialize SSE event data: {}", e);
230                "null".to_string()
231            }
232        };
233
234        let mut event = Event::default().data(json_data);
235
236        if let Some(event_type) = self.event_type {
237            event = event.event(event_type);
238        }
239
240        if let Some(id) = self.id {
241            event = event.id(id);
242        }
243
244        if let Some(retry) = self.retry {
245            event = event.retry(Duration::from_millis(retry));
246        }
247
248        event
249    }
250}
251
252/// SSE state shared across connections
253///
254/// Contains the event producer and optional JSON schema for validating
255/// events. This state is shared among all connections to the same SSE endpoint.
256pub struct SseState<P: SseEventProducer> {
257    /// The event producer implementation
258    producer: Arc<P>,
259    /// Optional JSON Schema for validating outgoing events
260    event_schema: Option<Arc<jsonschema::Validator>>,
261}
262
263impl<P: SseEventProducer> Clone for SseState<P> {
264    fn clone(&self) -> Self {
265        Self {
266            producer: Arc::clone(&self.producer),
267            event_schema: self.event_schema.clone(),
268        }
269    }
270}
271
272impl<P: SseEventProducer + 'static> SseState<P> {
273    /// Create new SSE state with an event producer
274    ///
275    /// Creates a new state without event validation schema.
276    /// Events are not validated.
277    ///
278    /// # Arguments
279    /// * `producer` - The event producer implementation
280    ///
281    /// # Example
282    ///
283    /// ```ignore
284    /// let state = SseState::new(MyProducer);
285    /// ```
286    pub fn new(producer: P) -> Self {
287        Self {
288            producer: Arc::new(producer),
289            event_schema: None,
290        }
291    }
292
293    /// Create new SSE state with an event producer and optional event schema
294    ///
295    /// Creates a new state with optional JSON schema for validating outgoing events.
296    /// If a schema is provided and an event fails validation, it is silently dropped.
297    ///
298    /// # Arguments
299    /// * `producer` - The event producer implementation
300    /// * `event_schema` - Optional JSON schema for validating events
301    ///
302    /// # Returns
303    /// * `Ok(state)` - Successfully created state
304    /// * `Err(msg)` - Invalid schema provided
305    ///
306    /// # Example
307    ///
308    /// ```ignore
309    /// use serde_json::json;
310    ///
311    /// let event_schema = json!({
312    ///     "type": "object",
313    ///     "properties": {
314    ///         "count": {"type": "integer"}
315    ///     }
316    /// });
317    ///
318    /// let state = SseState::with_schema(MyProducer, Some(event_schema))?;
319    /// ```
320    pub fn with_schema(producer: P, event_schema: Option<serde_json::Value>) -> Result<Self, String> {
321        let event_validator = if let Some(schema) = event_schema {
322            Some(Arc::new(
323                jsonschema::validator_for(&schema).map_err(|e| format!("Invalid event schema: {}", e))?,
324            ))
325        } else {
326            None
327        };
328
329        Ok(Self {
330            producer: Arc::new(producer),
331            event_schema: event_validator,
332        })
333    }
334}
335
336/// SSE endpoint handler
337///
338/// This is the main entry point for SSE connections. Use this as an Axum route
339/// handler by passing it to an Axum router's `.route()` method with `get()`.
340///
341/// The handler establishes a connection and streams events from the producer to
342/// the client using the Server-Sent Events protocol (text/event-stream).
343///
344/// # Arguments
345/// * `State(state)` - Application state containing the event producer and optional schema
346///
347/// # Returns
348/// A streaming response with the `text/event-stream` content type
349///
350/// # Example
351///
352/// ```ignore
353/// use axum::{Router, routing::get, extract::State};
354///
355/// let state = SseState::new(MyProducer);
356/// let router = Router::new()
357///     .route("/events", get(sse_handler::<MyProducer>))
358///     .with_state(state);
359///
360/// // Client usage:
361/// // const eventSource = new EventSource('/events');
362/// // eventSource.onmessage = (e) => console.log(e.data);
363/// ```
364pub async fn sse_handler<P: SseEventProducer + 'static>(State(state): State<SseState<P>>) -> impl IntoResponse {
365    info!("SSE client connected");
366
367    state.producer.on_connect().await;
368
369    let producer = Arc::clone(&state.producer);
370    let event_schema = state.event_schema.clone();
371    let stream = stream::unfold((producer, event_schema), |(producer, event_schema)| async move {
372        match producer.next_event().await {
373            Some(sse_event) => {
374                debug!("Sending SSE event: {:?}", sse_event.event_type);
375
376                if let Some(validator) = &event_schema
377                    && !validator.is_valid(&sse_event.data)
378                {
379                    error!("SSE event validation failed");
380                    return Some((
381                        Ok::<_, Infallible>(Event::default().data("validation_error")),
382                        (producer, event_schema),
383                    ));
384                }
385
386                let event = sse_event.into_axum_event();
387                Some((Ok::<_, Infallible>(event), (producer, event_schema)))
388            }
389            None => {
390                info!("SSE stream ended");
391                producer.on_disconnect().await;
392                None
393            }
394        }
395    });
396
397    let sse_response =
398        Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)).text("keep-alive"));
399
400    sse_response.into_response()
401}
402
403#[cfg(test)]
404mod tests {
405    use super::*;
406    use std::sync::Arc;
407    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
408
409    struct TestProducer {
410        count: AtomicUsize,
411    }
412
413    impl SseEventProducer for TestProducer {
414        async fn next_event(&self) -> Option<SseEvent> {
415            let count = self.count.fetch_add(1, Ordering::Relaxed);
416            if count < 3 {
417                Some(SseEvent::new(serde_json::json!({
418                    "message": format!("Event {}", count)
419                })))
420            } else {
421                None
422            }
423        }
424    }
425
426    /// Producer that tracks connect/disconnect lifecycle
427    struct LifecycleProducer {
428        connect_count: Arc<AtomicUsize>,
429        disconnect_count: Arc<AtomicUsize>,
430        event_count: AtomicUsize,
431    }
432
433    impl LifecycleProducer {
434        fn new(connect: Arc<AtomicUsize>, disconnect: Arc<AtomicUsize>) -> Self {
435            Self {
436                connect_count: connect,
437                disconnect_count: disconnect,
438                event_count: AtomicUsize::new(0),
439            }
440        }
441    }
442
443    impl SseEventProducer for LifecycleProducer {
444        async fn next_event(&self) -> Option<SseEvent> {
445            let idx: usize = self.event_count.fetch_add(1, Ordering::Relaxed);
446            if idx < 2 {
447                Some(SseEvent::new(serde_json::json!({"event": idx})))
448            } else {
449                None
450            }
451        }
452
453        async fn on_connect(&self) {
454            self.connect_count.fetch_add(1, Ordering::Relaxed);
455        }
456
457        async fn on_disconnect(&self) {
458            self.disconnect_count.fetch_add(1, Ordering::Relaxed);
459        }
460    }
461
462    /// Producer for multiline event testing
463    struct MultilineProducer {
464        sent: AtomicBool,
465    }
466
467    impl SseEventProducer for MultilineProducer {
468        async fn next_event(&self) -> Option<SseEvent> {
469            let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
470            if !was_sent {
471                Some(SseEvent::new(serde_json::json!({
472                    "text": "line1\nline2\nline3"
473                })))
474            } else {
475                None
476            }
477        }
478    }
479
480    /// Producer for special characters testing
481    struct SpecialCharsProducer {
482        sent: AtomicBool,
483    }
484
485    impl SseEventProducer for SpecialCharsProducer {
486        async fn next_event(&self) -> Option<SseEvent> {
487            let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
488            if !was_sent {
489                Some(SseEvent::new(serde_json::json!({
490                    "data": "special: \"quotes\", \\ backslash, \t tab, \r\n crlf"
491                })))
492            } else {
493                None
494            }
495        }
496    }
497
498    /// Producer for large payload testing
499    struct LargePayloadProducer {
500        sent: AtomicBool,
501    }
502
503    impl SseEventProducer for LargePayloadProducer {
504        async fn next_event(&self) -> Option<SseEvent> {
505            let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
506            if !was_sent {
507                let large_string: String = "x".repeat(100_000);
508                Some(SseEvent::new(serde_json::json!({
509                    "payload": large_string
510                })))
511            } else {
512                None
513            }
514        }
515    }
516
517    /// Producer that sends many events rapidly
518    struct RapidEventProducer {
519        event_count: usize,
520        current: AtomicUsize,
521    }
522
523    impl RapidEventProducer {
524        fn new(count: usize) -> Self {
525            Self {
526                event_count: count,
527                current: AtomicUsize::new(0),
528            }
529        }
530    }
531
532    impl SseEventProducer for RapidEventProducer {
533        async fn next_event(&self) -> Option<SseEvent> {
534            let idx: usize = self.current.fetch_add(1, Ordering::Relaxed);
535            if idx < self.event_count {
536                Some(SseEvent::new(serde_json::json!({
537                    "id": idx,
538                    "data": format!("event_{}", idx)
539                })))
540            } else {
541                None
542            }
543        }
544    }
545
546    /// Producer with all event fields populated
547    struct FullFieldProducer {
548        sent: AtomicBool,
549    }
550
551    impl SseEventProducer for FullFieldProducer {
552        async fn next_event(&self) -> Option<SseEvent> {
553            let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
554            if !was_sent {
555                Some(
556                    SseEvent::with_type(
557                        "counter_update",
558                        serde_json::json!({
559                            "count": 42,
560                            "status": "active"
561                        }),
562                    )
563                    .with_id("event-123")
564                    .with_retry(5000),
565                )
566            } else {
567                None
568            }
569        }
570    }
571
572    /// Producer that ends immediately (keep-alive test)
573    struct NoEventProducer;
574
575    impl SseEventProducer for NoEventProducer {
576        async fn next_event(&self) -> Option<SseEvent> {
577            None
578        }
579    }
580
581    #[test]
582    fn test_sse_event_creation_minimal() {
583        let event: SseEvent = SseEvent::new(serde_json::json!({"test": "data"}));
584        assert!(event.event_type.is_none());
585        assert!(event.id.is_none());
586        assert!(event.retry.is_none());
587    }
588
589    #[test]
590    fn test_sse_event_with_all_fields() {
591        let event: SseEvent = SseEvent::with_type("update", serde_json::json!({"count": 42}))
592            .with_id("event-001")
593            .with_retry(3000);
594
595        assert_eq!(event.event_type, Some("update".to_string()));
596        assert_eq!(event.id, Some("event-001".to_string()));
597        assert_eq!(event.retry, Some(3000));
598    }
599
600    #[test]
601    fn test_sse_event_builder_pattern() {
602        let event: SseEvent = SseEvent::with_type("notification", serde_json::json!({"text": "hello"}))
603            .with_id("notif-456")
604            .with_retry(5000);
605
606        assert_eq!(event.event_type, Some("notification".to_string()));
607        assert_eq!(event.id, Some("notif-456".to_string()));
608        assert_eq!(event.retry, Some(5000));
609    }
610
611    #[test]
612    fn test_sse_event_multiline_data() {
613        let event: SseEvent = SseEvent::new(serde_json::json!({
614            "text": "line1\nline2\nline3"
615        }));
616
617        assert!(event.data.is_object());
618        let text: Option<&str> = event.data.get("text").and_then(|v| v.as_str());
619        assert_eq!(text, Some("line1\nline2\nline3"));
620    }
621
622    #[test]
623    fn test_sse_event_special_characters() {
624        let event: SseEvent = SseEvent::new(serde_json::json!({
625            "data": "special: \"quotes\", \\ backslash"
626        }));
627
628        assert!(event.data.is_object());
629    }
630
631    #[test]
632    fn test_sse_event_large_payload() {
633        let large_string: String = "x".repeat(100_000);
634        let event: SseEvent = SseEvent::new(serde_json::json!({
635            "payload": large_string.clone()
636        }));
637
638        let payload_field: Option<&str> = event.data.get("payload").and_then(|v| v.as_str());
639        assert_eq!(payload_field.map(|s| s.len()), Some(100_000));
640    }
641
642    #[test]
643    fn test_sse_event_into_axum_event_conversion() {
644        let event: SseEvent = SseEvent::new(serde_json::json!({"msg": "test"}));
645        let _axum_event: axum::response::sse::Event = event.into_axum_event();
646    }
647
648    #[test]
649    fn test_sse_event_into_axum_with_all_fields() {
650        let event: SseEvent = SseEvent::with_type("event", serde_json::json!({"id": 1}))
651            .with_id("123")
652            .with_retry(5000);
653
654        let _axum_event: axum::response::sse::Event = event.into_axum_event();
655    }
656
657    #[test]
658    fn test_sse_state_creation() {
659        let producer: TestProducer = TestProducer {
660            count: AtomicUsize::new(0),
661        };
662        let state: SseState<TestProducer> = SseState::new(producer);
663        let cloned: SseState<TestProducer> = state.clone();
664        assert!(Arc::ptr_eq(&state.producer, &cloned.producer));
665    }
666
667    #[test]
668    fn test_sse_state_with_schema_valid() {
669        let producer: TestProducer = TestProducer {
670            count: AtomicUsize::new(0),
671        };
672        let schema: serde_json::Value = serde_json::json!({
673            "type": "object",
674            "properties": {
675                "message": {"type": "string"}
676            }
677        });
678
679        let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, Some(schema));
680        assert!(result.is_ok());
681    }
682
683    #[test]
684    fn test_sse_state_with_invalid_schema() {
685        let producer: TestProducer = TestProducer {
686            count: AtomicUsize::new(0),
687        };
688        let invalid_schema: serde_json::Value = serde_json::json!({
689            "type": "not-a-valid-type"
690        });
691
692        let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, Some(invalid_schema));
693        assert!(result.is_err());
694    }
695
696    #[test]
697    fn test_sse_state_with_schema_none() {
698        let producer: TestProducer = TestProducer {
699            count: AtomicUsize::new(0),
700        };
701        let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, None);
702        assert!(result.is_ok());
703    }
704
705    #[tokio::test]
706    async fn test_sse_lifecycle_on_connect_called() {
707        let connect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
708        let disconnect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
709
710        let producer: LifecycleProducer =
711            LifecycleProducer::new(Arc::clone(&connect_count), Arc::clone(&disconnect_count));
712
713        producer.on_connect().await;
714        assert_eq!(connect_count.load(Ordering::Relaxed), 1);
715    }
716
717    #[tokio::test]
718    async fn test_sse_lifecycle_on_disconnect_called() {
719        let connect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
720        let disconnect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
721
722        let producer: LifecycleProducer =
723            LifecycleProducer::new(Arc::clone(&connect_count), Arc::clone(&disconnect_count));
724
725        producer.on_disconnect().await;
726        assert_eq!(disconnect_count.load(Ordering::Relaxed), 1);
727    }
728
729    #[tokio::test]
730    async fn test_sse_event_ordering_preserved() {
731        let producer: RapidEventProducer = RapidEventProducer::new(10);
732
733        let mut last_idx: i32 = -1;
734        for _ in 0..10 {
735            if let Some(event) = producer.next_event().await {
736                if let Some(id) = event.data.get("id").and_then(|v| v.as_i64()) {
737                    assert!(id as i32 > last_idx, "Event ordering violated");
738                    last_idx = id as i32;
739                }
740            }
741        }
742    }
743
744    #[tokio::test]
745    async fn test_sse_rapid_event_sending() {
746        let producer: RapidEventProducer = RapidEventProducer::new(100);
747
748        let mut count: usize = 0;
749        loop {
750            match producer.next_event().await {
751                Some(_event) => count += 1,
752                None => break,
753            }
754        }
755
756        assert_eq!(count, 100);
757    }
758
759    #[test]
760    fn test_sse_event_with_empty_data_object() {
761        let event: SseEvent = SseEvent::new(serde_json::json!({}));
762        assert!(event.data.is_object());
763    }
764
765    #[test]
766    fn test_sse_event_with_nested_data() {
767        let event: SseEvent = SseEvent::new(serde_json::json!({
768            "nested": {
769                "deep": {
770                    "value": "found"
771                }
772            }
773        }));
774
775        let deep_value: Option<&str> = event
776            .data
777            .get("nested")
778            .and_then(|v| v.get("deep"))
779            .and_then(|v| v.get("value"))
780            .and_then(|v| v.as_str());
781
782        assert_eq!(deep_value, Some("found"));
783    }
784
785    #[tokio::test]
786    async fn test_sse_producer_stream_ends_cleanly() {
787        let producer: NoEventProducer = NoEventProducer;
788
789        let event1: Option<SseEvent> = producer.next_event().await;
790        assert!(event1.is_none());
791
792        let event2: Option<SseEvent> = producer.next_event().await;
793        assert!(event2.is_none());
794    }
795
796    #[test]
797    fn test_sse_event_clone() {
798        let original: SseEvent = SseEvent::with_type("test", serde_json::json!({"data": "test"}))
799            .with_id("id-1")
800            .with_retry(2000);
801
802        let cloned: SseEvent = original.clone();
803
804        assert_eq!(cloned.event_type, original.event_type);
805        assert_eq!(cloned.id, original.id);
806        assert_eq!(cloned.retry, original.retry);
807        assert_eq!(cloned.data, original.data);
808    }
809
810    #[test]
811    fn test_sse_event_debug_impl() {
812        let event: SseEvent = SseEvent::new(serde_json::json!({"msg": "debug"}));
813        let debug_str: String = format!("{:?}", event);
814        assert!(debug_str.contains("SseEvent"));
815    }
816
817    #[tokio::test]
818    async fn test_sse_multiple_producers_independent() {
819        let producer1: TestProducer = TestProducer {
820            count: AtomicUsize::new(0),
821        };
822        let producer2: TestProducer = TestProducer {
823            count: AtomicUsize::new(0),
824        };
825
826        let _event1: Option<SseEvent> = producer1.next_event().await;
827        let _event2: Option<SseEvent> = producer2.next_event().await;
828
829        let count1: usize = producer1.count.load(Ordering::Relaxed);
830        let count2: usize = producer2.count.load(Ordering::Relaxed);
831
832        assert_eq!(count1, 1);
833        assert_eq!(count2, 1);
834    }
835
836    #[test]
837    fn test_sse_state_cloning_preserves_schema() {
838        let producer: TestProducer = TestProducer {
839            count: AtomicUsize::new(0),
840        };
841        let schema: serde_json::Value = serde_json::json!({
842            "type": "object",
843            "properties": {
844                "message": {"type": "string"}
845            }
846        });
847
848        let state: SseState<TestProducer> =
849            SseState::with_schema(producer, Some(schema)).expect("schema should be valid");
850        let cloned: SseState<TestProducer> = state.clone();
851
852        assert!(Arc::ptr_eq(&state.producer, &cloned.producer));
853        match (&state.event_schema, &cloned.event_schema) {
854            (Some(s1), Some(s2)) => {
855                assert!(Arc::ptr_eq(s1, s2));
856            }
857            _ => panic!("Schema should be preserved in clone"),
858        }
859    }
860
861    #[tokio::test]
862    async fn test_sse_large_payload_integrity() {
863        let producer: LargePayloadProducer = LargePayloadProducer {
864            sent: AtomicBool::new(false),
865        };
866
867        let event: Option<SseEvent> = producer.next_event().await;
868        assert!(event.is_some());
869
870        if let Some(evt) = event {
871            let payload: Option<&str> = evt.data.get("payload").and_then(|v| v.as_str());
872            assert_eq!(payload.map(|s| s.len()), Some(100_000));
873        }
874    }
875
876    #[tokio::test]
877    async fn test_sse_multiline_data_preservation() {
878        let producer: MultilineProducer = MultilineProducer {
879            sent: AtomicBool::new(false),
880        };
881
882        let event: Option<SseEvent> = producer.next_event().await;
883        assert!(event.is_some());
884
885        if let Some(evt) = event {
886            let text: Option<&str> = evt.data.get("text").and_then(|v| v.as_str());
887            assert_eq!(text, Some("line1\nline2\nline3"));
888        }
889    }
890
891    #[tokio::test]
892    async fn test_sse_special_chars_in_payload() {
893        let producer: SpecialCharsProducer = SpecialCharsProducer {
894            sent: AtomicBool::new(false),
895        };
896
897        let event: Option<SseEvent> = producer.next_event().await;
898        assert!(event.is_some());
899
900        if let Some(evt) = event {
901            let data: Option<&str> = evt.data.get("data").and_then(|v| v.as_str());
902            assert!(data.is_some());
903            assert!(data.unwrap().contains("quotes"));
904        }
905    }
906
907    #[tokio::test]
908    async fn test_sse_full_event_fields_together() {
909        let producer: FullFieldProducer = FullFieldProducer {
910            sent: AtomicBool::new(false),
911        };
912
913        let event: Option<SseEvent> = producer.next_event().await;
914        assert!(event.is_some());
915
916        if let Some(evt) = event {
917            assert_eq!(evt.event_type, Some("counter_update".to_string()));
918            assert_eq!(evt.id, Some("event-123".to_string()));
919            assert_eq!(evt.retry, Some(5000));
920            assert_eq!(evt.data.get("count").and_then(|v| v.as_i64()), Some(42));
921        }
922    }
923
924    #[test]
925    fn test_sse_event_to_axum_preserves_data() {
926        let event = SseEvent::new(serde_json::json!({"key": "value"}));
927        let _axum_event: axum::response::sse::Event = event.into_axum_event();
928    }
929
930    #[test]
931    fn test_sse_event_data_only_no_metadata() {
932        let event = SseEvent::new(serde_json::json!({"message": "hello"}));
933        assert!(event.event_type.is_none(), "event_type should be None");
934        assert!(event.id.is_none(), "id should be None");
935        assert!(event.retry.is_none(), "retry should be None");
936
937        let _axum_event: axum::response::sse::Event = event.into_axum_event();
938    }
939
940    #[test]
941    fn test_sse_event_with_all_fields_filled() {
942        let event = SseEvent::with_type("update", serde_json::json!({"status": "ok"}))
943            .with_id("evt-999")
944            .with_retry(10000);
945
946        assert_eq!(event.event_type.as_ref(), Some(&"update".to_string()));
947        assert_eq!(event.id.as_ref(), Some(&"evt-999".to_string()));
948        assert_eq!(event.retry, Some(10000));
949
950        let _axum_event: axum::response::sse::Event = event.into_axum_event();
951    }
952
953    #[test]
954    fn test_sse_event_empty_data_field() {
955        let event = SseEvent::new(serde_json::json!({}));
956        assert!(event.data.is_object());
957        assert_eq!(event.data.as_object().unwrap().len(), 0);
958
959        let _axum_event: axum::response::sse::Event = event.into_axum_event();
960    }
961
962    #[test]
963    fn test_sse_event_data_with_newlines_in_string() {
964        let multiline_data = "first line\nsecond line\nthird line";
965        let event = SseEvent::new(serde_json::json!({"text": multiline_data}));
966
967        let stored_text = event.data.get("text").and_then(|v| v.as_str());
968        assert_eq!(stored_text, Some(multiline_data));
969
970        let _axum_event: axum::response::sse::Event = event.into_axum_event();
971    }
972
973    #[test]
974    fn test_sse_event_data_with_colons() {
975        let data_with_colons = "key1: value1, key2: value2";
976        let event = SseEvent::new(serde_json::json!({"data": data_with_colons}));
977
978        let stored_data = event.data.get("data").and_then(|v| v.as_str());
979        assert_eq!(stored_data, Some(data_with_colons));
980
981        let _axum_event: axum::response::sse::Event = event.into_axum_event();
982    }
983
984    #[test]
985    fn test_sse_event_comment_only_structure() {
986        let event = SseEvent::new(serde_json::json!({"comment": "this is a comment"}));
987        let _axum_event: axum::response::sse::Event = event.into_axum_event();
988    }
989
990    #[test]
991    fn test_sse_event_type_with_spaces() {
992        let event = SseEvent::with_type("event type with spaces", serde_json::json!({"data": "test"}));
993        assert_eq!(event.event_type, Some("event type with spaces".to_string()));
994
995        let _axum_event: axum::response::sse::Event = event.into_axum_event();
996    }
997
998    #[test]
999    fn test_sse_event_type_with_special_chars() {
1000        let event_types = vec!["update-v2", "event_123", "message.sent", "type-with-dash"];
1001
1002        for event_type in event_types {
1003            let event = SseEvent::with_type(event_type, serde_json::json!({"data": "test"}));
1004            assert_eq!(event.event_type.as_ref(), Some(&event_type.to_string()));
1005
1006            let _axum_event: axum::response::sse::Event = event.into_axum_event();
1007        }
1008    }
1009
1010    #[test]
1011    fn test_sse_event_id_alphanumeric() {
1012        let ids = vec!["123", "abc-def", "event_001", "id-with-dashes-123"];
1013
1014        for id in ids {
1015            let event = SseEvent::new(serde_json::json!({"data": "test"})).with_id(id);
1016            assert_eq!(event.id.as_ref(), Some(&id.to_string()));
1017
1018            let _axum_event: axum::response::sse::Event = event.into_axum_event();
1019        }
1020    }
1021
1022    #[test]
1023    fn test_sse_event_retry_zero() {
1024        let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(0);
1025        assert_eq!(event.retry, Some(0));
1026
1027        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1028    }
1029
1030    #[test]
1031    fn test_sse_event_retry_small_value() {
1032        let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(100);
1033        assert_eq!(event.retry, Some(100));
1034
1035        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1036    }
1037
1038    #[test]
1039    fn test_sse_event_retry_large_value() {
1040        let large_retry = u64::MAX / 2;
1041        let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(large_retry);
1042        assert_eq!(event.retry, Some(large_retry));
1043
1044        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1045    }
1046
1047    #[test]
1048    fn test_sse_event_retry_typical_values() {
1049        let typical_retries = vec![1000, 3000, 5000, 10000, 30000];
1050
1051        for retry_ms in typical_retries {
1052            let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(retry_ms);
1053            assert_eq!(event.retry, Some(retry_ms));
1054
1055            let _axum_event: axum::response::sse::Event = event.into_axum_event();
1056        }
1057    }
1058
1059    #[test]
1060    fn test_sse_event_utf8_emoji_in_data() {
1061        let emoji_data = "Hello 👋 World 🌍";
1062        let event = SseEvent::new(serde_json::json!({"text": emoji_data}));
1063
1064        let stored = event.data.get("text").and_then(|v| v.as_str());
1065        assert_eq!(stored, Some(emoji_data));
1066
1067        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1068    }
1069
1070    #[test]
1071    fn test_sse_event_utf8_chinese_characters() {
1072        let chinese_text = "你好世界";
1073        let event = SseEvent::new(serde_json::json!({"text": chinese_text}));
1074
1075        let stored = event.data.get("text").and_then(|v| v.as_str());
1076        assert_eq!(stored, Some(chinese_text));
1077
1078        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1079    }
1080
1081    #[test]
1082    fn test_sse_event_utf8_arabic_characters() {
1083        let arabic_text = "مرحبا بالعالم";
1084        let event = SseEvent::new(serde_json::json!({"text": arabic_text}));
1085
1086        let stored = event.data.get("text").and_then(|v| v.as_str());
1087        assert_eq!(stored, Some(arabic_text));
1088
1089        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1090    }
1091
1092    #[test]
1093    fn test_sse_event_utf8_mixed_scripts() {
1094        let mixed = "Hello 你好 مرحبا 👋";
1095        let event = SseEvent::new(serde_json::json!({"text": mixed}));
1096
1097        let stored = event.data.get("text").and_then(|v| v.as_str());
1098        assert_eq!(stored, Some(mixed));
1099
1100        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1101    }
1102
1103    #[test]
1104    fn test_sse_event_json_serialization_produces_valid_utf8() {
1105        let event = SseEvent::new(serde_json::json!({"text": "test"}));
1106        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1107    }
1108
1109    #[test]
1110    fn test_sse_event_64kb_payload() {
1111        let large_data = "x".repeat(65536);
1112        let event = SseEvent::new(serde_json::json!({"payload": large_data.clone()}));
1113
1114        let stored = event.data.get("payload").and_then(|v| v.as_str());
1115        assert_eq!(stored.map(|s| s.len()), Some(65536));
1116
1117        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1118    }
1119
1120    #[test]
1121    fn test_sse_event_1mb_payload() {
1122        let large_data = "y".repeat(1_000_000);
1123        let event = SseEvent::new(serde_json::json!({"payload": large_data.clone()}));
1124
1125        let stored = event.data.get("payload").and_then(|v| v.as_str());
1126        assert_eq!(stored.map(|s| s.len()), Some(1_000_000));
1127
1128        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1129    }
1130
1131    #[test]
1132    fn test_sse_event_deeply_nested_json() {
1133        let deeply_nested = serde_json::json!({
1134            "level1": {
1135                "level2": {
1136                    "level3": {
1137                        "level4": {
1138                            "level5": {
1139                                "level6": {
1140                                    "level7": {
1141                                        "value": "deep"
1142                                    }
1143                                }
1144                            }
1145                        }
1146                    }
1147                }
1148            }
1149        });
1150
1151        let event = SseEvent::new(deeply_nested);
1152        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1153    }
1154
1155    #[test]
1156    fn test_sse_event_array_in_data() {
1157        let event = SseEvent::new(serde_json::json!({
1158            "items": [1, 2, 3, 4, 5]
1159        }));
1160
1161        let items = event.data.get("items").and_then(|v| v.as_array());
1162        assert!(items.is_some());
1163        assert_eq!(items.unwrap().len(), 5);
1164
1165        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1166    }
1167
1168    #[test]
1169    fn test_sse_event_null_value_in_data() {
1170        let event = SseEvent::new(serde_json::json!({
1171            "nullable": null
1172        }));
1173
1174        assert!(event.data.get("nullable").unwrap().is_null());
1175
1176        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1177    }
1178
1179    #[test]
1180    fn test_sse_event_boolean_values() {
1181        let event = SseEvent::new(serde_json::json!({
1182            "active": true,
1183            "deleted": false
1184        }));
1185
1186        assert_eq!(event.data.get("active").and_then(|v| v.as_bool()), Some(true));
1187        assert_eq!(event.data.get("deleted").and_then(|v| v.as_bool()), Some(false));
1188
1189        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1190    }
1191
1192    #[tokio::test]
1193    async fn test_sse_last_event_id_header_simulation() {
1194        let producer = RapidEventProducer::new(5);
1195
1196        let mut events = Vec::new();
1197        for _ in 0..5 {
1198            if let Some(evt) = producer.next_event().await {
1199                events.push(evt);
1200            }
1201        }
1202
1203        assert_eq!(events.len(), 5);
1204    }
1205
1206    #[tokio::test]
1207    async fn test_sse_retry_timeout_specification() {
1208        let producer = FullFieldProducer {
1209            sent: AtomicBool::new(false),
1210        };
1211
1212        let event = producer.next_event().await;
1213        assert!(event.is_some());
1214
1215        if let Some(evt) = event {
1216            assert_eq!(evt.retry, Some(5000), "Retry should be 5000ms");
1217        }
1218    }
1219
1220    #[test]
1221    fn test_sse_event_builder_method_chaining() {
1222        let event = SseEvent::new(serde_json::json!({"data": "test"}))
1223            .with_id("id-1")
1224            .with_retry(3000);
1225
1226        assert_eq!(event.id, Some("id-1".to_string()));
1227        assert_eq!(event.retry, Some(3000));
1228
1229        let event2 = SseEvent::with_type("msg", serde_json::json!({"x": 1}))
1230            .with_id("id-2")
1231            .with_retry(5000);
1232
1233        assert_eq!(event2.event_type, Some("msg".to_string()));
1234        assert_eq!(event2.id, Some("id-2".to_string()));
1235        assert_eq!(event2.retry, Some(5000));
1236    }
1237
1238    #[test]
1239    fn test_sse_event_overwriting_fields() {
1240        let event = SseEvent::new(serde_json::json!({"v": 1}))
1241            .with_id("id-original")
1242            .with_retry(1000);
1243
1244        assert_eq!(event.id, Some("id-original".to_string()));
1245        assert_eq!(event.retry, Some(1000));
1246    }
1247
1248    #[test]
1249    fn test_sse_event_type_empty_string() {
1250        let event = SseEvent::with_type("", serde_json::json!({"data": "test"}));
1251        assert_eq!(event.event_type, Some("".to_string()));
1252
1253        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1254    }
1255
1256    #[test]
1257    fn test_sse_event_id_empty_string() {
1258        let event = SseEvent::new(serde_json::json!({"data": "test"})).with_id("");
1259        assert_eq!(event.id, Some("".to_string()));
1260
1261        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1262    }
1263
1264    #[tokio::test]
1265    async fn test_sse_event_sequence_maintains_order() {
1266        let producer = RapidEventProducer::new(10);
1267
1268        let mut event_ids = Vec::new();
1269        for _ in 0..10 {
1270            if let Some(evt) = producer.next_event().await {
1271                if let Some(id) = evt.data.get("id").and_then(|v| v.as_i64()) {
1272                    event_ids.push(id);
1273                }
1274            }
1275        }
1276
1277        for i in 0..event_ids.len() {
1278            assert_eq!(event_ids[i], i as i64, "Event order should match insertion order");
1279        }
1280    }
1281
1282    #[tokio::test]
1283    async fn test_sse_rapid_events_no_loss() {
1284        let producer = RapidEventProducer::new(50);
1285
1286        let mut count = 0;
1287        loop {
1288            match producer.next_event().await {
1289                Some(_) => count += 1,
1290                None => break,
1291            }
1292        }
1293
1294        assert_eq!(count, 50, "All events should be produced without loss");
1295    }
1296
1297    #[tokio::test]
1298    async fn test_sse_event_batching_simulation() {
1299        let producer = RapidEventProducer::new(20);
1300
1301        let mut batch_size = 0;
1302        let mut batch_count = 0;
1303
1304        loop {
1305            match producer.next_event().await {
1306                Some(_evt) => {
1307                    batch_size += 1;
1308                    if batch_size >= 5 {
1309                        batch_count += 1;
1310                        batch_size = 0;
1311                    }
1312                }
1313                None => {
1314                    if batch_size > 0 {
1315                        batch_count += 1;
1316                    }
1317                    break;
1318                }
1319            }
1320        }
1321
1322        assert!(batch_count >= 4, "Should have processed at least 4 batches");
1323    }
1324
1325    #[test]
1326    fn test_sse_state_arc_sharing() {
1327        let producer = TestProducer {
1328            count: AtomicUsize::new(0),
1329        };
1330        let state1 = SseState::new(producer);
1331        let state2 = state1.clone();
1332        let state3 = state2.clone();
1333
1334        assert!(Arc::ptr_eq(&state1.producer, &state2.producer));
1335        assert!(Arc::ptr_eq(&state2.producer, &state3.producer));
1336    }
1337
1338    #[test]
1339    fn test_sse_state_schema_arc_sharing() {
1340        let producer = TestProducer {
1341            count: AtomicUsize::new(0),
1342        };
1343        let schema = serde_json::json!({
1344            "type": "object"
1345        });
1346
1347        let state1 = SseState::with_schema(producer, Some(schema)).expect("schema should be valid");
1348        let state2 = state1.clone();
1349
1350        match (&state1.event_schema, &state2.event_schema) {
1351            (Some(s1), Some(s2)) => {
1352                assert!(Arc::ptr_eq(s1, s2));
1353            }
1354            _ => panic!("Both states should have schema"),
1355        }
1356    }
1357
1358    #[test]
1359    fn test_sse_event_into_axum_event_numeric_data() {
1360        let event = SseEvent::new(serde_json::json!({
1361            "count": 42,
1362            "temperature": 98.6,
1363            "negative": -273
1364        }));
1365
1366        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1367    }
1368
1369    #[test]
1370    fn test_sse_event_json_number_precision() {
1371        let event = SseEvent::new(serde_json::json!({
1372            "float": 3.14159265359,
1373            "large_int": 9007199254740991i64
1374        }));
1375
1376        assert_eq!(event.data.get("float").and_then(|v| v.as_f64()), Some(3.14159265359));
1377
1378        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1379    }
1380
1381    #[test]
1382    fn test_sse_event_string_escaping() {
1383        let event = SseEvent::new(serde_json::json!({
1384            "escaped": "line1\nline2\ttab",
1385            "quotes": "He said \"hello\"",
1386            "backslash": "path\\to\\file"
1387        }));
1388
1389        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1390    }
1391
1392    #[test]
1393    fn test_sse_event_all_json_types_combined() {
1394        let event = SseEvent::new(serde_json::json!({
1395            "string": "text",
1396            "number": 123,
1397            "float": 1.5,
1398            "boolean": true,
1399            "null_value": null,
1400            "array": [1, 2, 3],
1401            "object": {
1402                "nested": "value"
1403            }
1404        }));
1405
1406        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1407    }
1408}