spikard_http/
sse.rs

1//! Server-Sent Events (SSE) support for Spikard
2//!
3//! Provides SSE streaming with event generation and lifecycle management.
4
5use axum::{
6    extract::State,
7    response::{
8        IntoResponse,
9        sse::{Event, KeepAlive, Sse},
10    },
11};
12use futures_util::stream;
13use serde_json::Value;
14use std::{convert::Infallible, sync::Arc, time::Duration};
15use tracing::{debug, error, info};
16
17/// SSE event producer trait
18///
19/// Implement this trait to create custom Server-Sent Event (SSE) producers for your application.
20/// The producer generates events that are streamed to connected clients.
21///
22/// # Understanding SSE
23///
24/// Server-Sent Events (SSE) provide one-way communication from server to client over HTTP.
25/// Unlike WebSocket, SSE uses standard HTTP and automatically handles reconnection.
26/// Use SSE when you need to push data to clients without bidirectional communication.
27///
28/// # Implementing the Trait
29///
30/// You must implement the `next_event` method to generate events. The `on_connect` and
31/// `on_disconnect` methods are optional lifecycle hooks.
32///
33/// # Example
34///
35/// ```ignore
36/// use spikard_http::sse::{SseEventProducer, SseEvent};
37/// use serde_json::json;
38/// use std::time::Duration;
39/// use tokio::time::sleep;
40///
41/// struct CounterProducer {
42///     limit: usize,
43/// }
44///
45/// #[async_trait]
46/// impl SseEventProducer for CounterProducer {
47///     async fn next_event(&self) -> Option<SseEvent> {
48///         static COUNTER: std::sync::atomic::AtomicUsize = std::sync::atomic::AtomicUsize::new(0);
49///
50///         let count = COUNTER.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
51///         if count < self.limit {
52///             Some(SseEvent::new(json!({"count": count})))
53///         } else {
54///             None
55///         }
56///     }
57///
58///     async fn on_connect(&self) {
59///         println!("Client connected");
60///     }
61///
62///     async fn on_disconnect(&self) {
63///         println!("Client disconnected");
64///     }
65/// }
66/// ```
67pub trait SseEventProducer: Send + Sync {
68    /// Generate the next event
69    ///
70    /// Called repeatedly to produce the event stream. Should return `Some(event)` when
71    /// an event is ready to send, or `None` when the stream should end.
72    ///
73    /// # Returns
74    /// * `Some(event)` - Event to send to the client
75    /// * `None` - Stream complete, connection will close
76    fn next_event(&self) -> impl std::future::Future<Output = Option<SseEvent>> + Send;
77
78    /// Called when a client connects to the SSE endpoint
79    ///
80    /// Optional lifecycle hook invoked when a new SSE connection is established.
81    /// Default implementation does nothing.
82    fn on_connect(&self) -> impl std::future::Future<Output = ()> + Send {
83        async {}
84    }
85
86    /// Called when a client disconnects from the SSE endpoint
87    ///
88    /// Optional lifecycle hook invoked when an SSE connection is closed (either by the
89    /// client or the stream ending). Default implementation does nothing.
90    fn on_disconnect(&self) -> impl std::future::Future<Output = ()> + Send {
91        async {}
92    }
93}
94
95/// An individual SSE event
96///
97/// Represents a single Server-Sent Event to be sent to a connected client.
98/// Events can have an optional type, ID, and retry timeout for advanced scenarios.
99///
100/// # Fields
101///
102/// * `event_type` - Optional event type string (used for client-side event filtering)
103/// * `data` - JSON data payload to send to the client
104/// * `id` - Optional event ID (clients can use this to resume after disconnect)
105/// * `retry` - Optional retry timeout in milliseconds (tells client when to reconnect)
106///
107/// # SSE Format
108///
109/// Events are serialized to the following text format:
110/// ```text
111/// event: event_type
112/// data: {"json":"value"}
113/// id: event-123
114/// retry: 3000
115/// ```
116#[derive(Debug, Clone)]
117pub struct SseEvent {
118    /// Event type (optional)
119    pub event_type: Option<String>,
120    /// Event data (JSON value)
121    pub data: Value,
122    /// Event ID (optional, for client-side reconnection)
123    pub id: Option<String>,
124    /// Retry timeout in milliseconds (optional)
125    pub retry: Option<u64>,
126}
127
128impl SseEvent {
129    /// Create a new SSE event with data only
130    ///
131    /// Creates a minimal event with just the data payload. Use builder methods
132    /// to add optional fields.
133    ///
134    /// # Arguments
135    /// * `data` - JSON value to send to the client
136    ///
137    /// # Example
138    ///
139    /// ```ignore
140    /// use serde_json::json;
141    /// use spikard_http::sse::SseEvent;
142    ///
143    /// let event = SseEvent::new(json!({"status": "connected"}));
144    /// ```
145    pub fn new(data: Value) -> Self {
146        Self {
147            event_type: None,
148            data,
149            id: None,
150            retry: None,
151        }
152    }
153
154    /// Create a new SSE event with an event type and data
155    ///
156    /// Creates an event with a type field. Clients can filter events by type
157    /// in their event listener.
158    ///
159    /// # Arguments
160    /// * `event_type` - String identifying the event type (e.g., "update", "error")
161    /// * `data` - JSON value to send to the client
162    ///
163    /// # Example
164    ///
165    /// ```ignore
166    /// use serde_json::json;
167    /// use spikard_http::sse::SseEvent;
168    ///
169    /// let event = SseEvent::with_type("update", json!({"count": 42}));
170    /// // Client can listen with: eventSource.addEventListener("update", ...)
171    /// ```
172    pub fn with_type(event_type: impl Into<String>, data: Value) -> Self {
173        Self {
174            event_type: Some(event_type.into()),
175            data,
176            id: None,
177            retry: None,
178        }
179    }
180
181    /// Set the event ID for client-side reconnection support
182    ///
183    /// Sets an ID that clients can use to resume from this point if they disconnect.
184    /// The client sends this ID back in the `Last-Event-ID` header when reconnecting.
185    ///
186    /// # Arguments
187    /// * `id` - Unique identifier for this event
188    ///
189    /// # Example
190    ///
191    /// ```ignore
192    /// use serde_json::json;
193    /// use spikard_http::sse::SseEvent;
194    ///
195    /// let event = SseEvent::new(json!({"count": 1}))
196    ///     .with_id("event-1");
197    /// ```
198    pub fn with_id(mut self, id: impl Into<String>) -> Self {
199        self.id = Some(id.into());
200        self
201    }
202
203    /// Set the retry timeout for client reconnection
204    ///
205    /// Sets the time in milliseconds clients should wait before attempting to reconnect
206    /// if the connection is lost. The client browser will automatically handle reconnection.
207    ///
208    /// # Arguments
209    /// * `retry_ms` - Retry timeout in milliseconds
210    ///
211    /// # Example
212    ///
213    /// ```ignore
214    /// use serde_json::json;
215    /// use spikard_http::sse::SseEvent;
216    ///
217    /// let event = SseEvent::new(json!({"data": "value"}))
218    ///     .with_retry(5000); // Reconnect after 5 seconds
219    /// ```
220    pub fn with_retry(mut self, retry_ms: u64) -> Self {
221        self.retry = Some(retry_ms);
222        self
223    }
224
225    /// Convert to Axum's SSE Event
226    fn into_axum_event(self) -> Event {
227        let json_data = match serde_json::to_string(&self.data) {
228            Ok(json) => json,
229            Err(e) => {
230                error!("Failed to serialize SSE event data: {}", e);
231                "null".to_string()
232            }
233        };
234
235        let mut event = Event::default().data(json_data);
236
237        if let Some(event_type) = self.event_type {
238            event = event.event(event_type);
239        }
240
241        if let Some(id) = self.id {
242            event = event.id(id);
243        }
244
245        if let Some(retry) = self.retry {
246            event = event.retry(Duration::from_millis(retry));
247        }
248
249        event
250    }
251}
252
253/// SSE state shared across connections
254///
255/// Contains the event producer and optional JSON schema for validating
256/// events. This state is shared among all connections to the same SSE endpoint.
257pub struct SseState<P: SseEventProducer> {
258    /// The event producer implementation
259    producer: Arc<P>,
260    /// Optional JSON Schema for validating outgoing events
261    event_schema: Option<Arc<jsonschema::Validator>>,
262}
263
264impl<P: SseEventProducer> Clone for SseState<P> {
265    fn clone(&self) -> Self {
266        Self {
267            producer: Arc::clone(&self.producer),
268            event_schema: self.event_schema.clone(),
269        }
270    }
271}
272
273impl<P: SseEventProducer + 'static> SseState<P> {
274    /// Create new SSE state with an event producer
275    ///
276    /// Creates a new state without event validation schema.
277    /// Events are not validated.
278    ///
279    /// # Arguments
280    /// * `producer` - The event producer implementation
281    ///
282    /// # Example
283    ///
284    /// ```ignore
285    /// let state = SseState::new(MyProducer);
286    /// ```
287    pub fn new(producer: P) -> Self {
288        Self {
289            producer: Arc::new(producer),
290            event_schema: None,
291        }
292    }
293
294    /// Create new SSE state with an event producer and optional event schema
295    ///
296    /// Creates a new state with optional JSON schema for validating outgoing events.
297    /// If a schema is provided and an event fails validation, it is silently dropped.
298    ///
299    /// # Arguments
300    /// * `producer` - The event producer implementation
301    /// * `event_schema` - Optional JSON schema for validating events
302    ///
303    /// # Returns
304    /// * `Ok(state)` - Successfully created state
305    /// * `Err(msg)` - Invalid schema provided
306    ///
307    /// # Example
308    ///
309    /// ```ignore
310    /// use serde_json::json;
311    ///
312    /// let event_schema = json!({
313    ///     "type": "object",
314    ///     "properties": {
315    ///         "count": {"type": "integer"}
316    ///     }
317    /// });
318    ///
319    /// let state = SseState::with_schema(MyProducer, Some(event_schema))?;
320    /// ```
321    pub fn with_schema(producer: P, event_schema: Option<serde_json::Value>) -> Result<Self, String> {
322        let event_validator = if let Some(schema) = event_schema {
323            Some(Arc::new(
324                jsonschema::validator_for(&schema).map_err(|e| format!("Invalid event schema: {}", e))?,
325            ))
326        } else {
327            None
328        };
329
330        Ok(Self {
331            producer: Arc::new(producer),
332            event_schema: event_validator,
333        })
334    }
335}
336
337/// SSE endpoint handler
338///
339/// This is the main entry point for SSE connections. Use this as an Axum route
340/// handler by passing it to an Axum router's `.route()` method with `get()`.
341///
342/// The handler establishes a connection and streams events from the producer to
343/// the client using the Server-Sent Events protocol (text/event-stream).
344///
345/// # Arguments
346/// * `State(state)` - Application state containing the event producer and optional schema
347///
348/// # Returns
349/// A streaming response with the `text/event-stream` content type
350///
351/// # Example
352///
353/// ```ignore
354/// use axum::{Router, routing::get, extract::State};
355///
356/// let state = SseState::new(MyProducer);
357/// let router = Router::new()
358///     .route("/events", get(sse_handler::<MyProducer>))
359///     .with_state(state);
360///
361/// // Client usage:
362/// // const eventSource = new EventSource('/events');
363/// // eventSource.onmessage = (e) => console.log(e.data);
364/// ```
365pub async fn sse_handler<P: SseEventProducer + 'static>(State(state): State<SseState<P>>) -> impl IntoResponse {
366    info!("SSE client connected");
367
368    state.producer.on_connect().await;
369
370    let producer = Arc::clone(&state.producer);
371    let event_schema = state.event_schema.clone();
372    let stream = stream::unfold((producer, event_schema), |(producer, event_schema)| async move {
373        match producer.next_event().await {
374            Some(sse_event) => {
375                debug!("Sending SSE event: {:?}", sse_event.event_type);
376
377                if let Some(validator) = &event_schema
378                    && !validator.is_valid(&sse_event.data)
379                {
380                    error!("SSE event validation failed");
381                    return Some((
382                        Ok::<_, Infallible>(Event::default().data("validation_error")),
383                        (producer, event_schema),
384                    ));
385                }
386
387                let event = sse_event.into_axum_event();
388                Some((Ok::<_, Infallible>(event), (producer, event_schema)))
389            }
390            None => {
391                info!("SSE stream ended");
392                producer.on_disconnect().await;
393                None
394            }
395        }
396    });
397
398    let sse_response =
399        Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)).text("keep-alive"));
400
401    sse_response.into_response()
402}
403
404#[cfg(test)]
405mod tests {
406    use super::*;
407    use std::sync::Arc;
408    use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
409
410    struct TestProducer {
411        count: AtomicUsize,
412    }
413
414    impl SseEventProducer for TestProducer {
415        async fn next_event(&self) -> Option<SseEvent> {
416            let count = self.count.fetch_add(1, Ordering::Relaxed);
417            if count < 3 {
418                Some(SseEvent::new(serde_json::json!({
419                    "message": format!("Event {}", count)
420                })))
421            } else {
422                None
423            }
424        }
425    }
426
427    /// Producer that tracks connect/disconnect lifecycle
428    struct LifecycleProducer {
429        connect_count: Arc<AtomicUsize>,
430        disconnect_count: Arc<AtomicUsize>,
431        event_count: AtomicUsize,
432    }
433
434    impl LifecycleProducer {
435        fn new(connect: Arc<AtomicUsize>, disconnect: Arc<AtomicUsize>) -> Self {
436            Self {
437                connect_count: connect,
438                disconnect_count: disconnect,
439                event_count: AtomicUsize::new(0),
440            }
441        }
442    }
443
444    impl SseEventProducer for LifecycleProducer {
445        async fn next_event(&self) -> Option<SseEvent> {
446            let idx: usize = self.event_count.fetch_add(1, Ordering::Relaxed);
447            if idx < 2 {
448                Some(SseEvent::new(serde_json::json!({"event": idx})))
449            } else {
450                None
451            }
452        }
453
454        async fn on_connect(&self) {
455            self.connect_count.fetch_add(1, Ordering::Relaxed);
456        }
457
458        async fn on_disconnect(&self) {
459            self.disconnect_count.fetch_add(1, Ordering::Relaxed);
460        }
461    }
462
463    /// Producer for multiline event testing
464    struct MultilineProducer {
465        sent: AtomicBool,
466    }
467
468    impl SseEventProducer for MultilineProducer {
469        async fn next_event(&self) -> Option<SseEvent> {
470            let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
471            if !was_sent {
472                Some(SseEvent::new(serde_json::json!({
473                    "text": "line1\nline2\nline3"
474                })))
475            } else {
476                None
477            }
478        }
479    }
480
481    /// Producer for special characters testing
482    struct SpecialCharsProducer {
483        sent: AtomicBool,
484    }
485
486    impl SseEventProducer for SpecialCharsProducer {
487        async fn next_event(&self) -> Option<SseEvent> {
488            let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
489            if !was_sent {
490                Some(SseEvent::new(serde_json::json!({
491                    "data": "special: \"quotes\", \\ backslash, \t tab, \r\n crlf"
492                })))
493            } else {
494                None
495            }
496        }
497    }
498
499    /// Producer for large payload testing
500    struct LargePayloadProducer {
501        sent: AtomicBool,
502    }
503
504    impl SseEventProducer for LargePayloadProducer {
505        async fn next_event(&self) -> Option<SseEvent> {
506            let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
507            if !was_sent {
508                let large_string: String = "x".repeat(100_000);
509                Some(SseEvent::new(serde_json::json!({
510                    "payload": large_string
511                })))
512            } else {
513                None
514            }
515        }
516    }
517
518    /// Producer that sends many events rapidly
519    struct RapidEventProducer {
520        event_count: usize,
521        current: AtomicUsize,
522    }
523
524    impl RapidEventProducer {
525        fn new(count: usize) -> Self {
526            Self {
527                event_count: count,
528                current: AtomicUsize::new(0),
529            }
530        }
531    }
532
533    impl SseEventProducer for RapidEventProducer {
534        async fn next_event(&self) -> Option<SseEvent> {
535            let idx: usize = self.current.fetch_add(1, Ordering::Relaxed);
536            if idx < self.event_count {
537                Some(SseEvent::new(serde_json::json!({
538                    "id": idx,
539                    "data": format!("event_{}", idx)
540                })))
541            } else {
542                None
543            }
544        }
545    }
546
547    /// Producer with all event fields populated
548    struct FullFieldProducer {
549        sent: AtomicBool,
550    }
551
552    impl SseEventProducer for FullFieldProducer {
553        async fn next_event(&self) -> Option<SseEvent> {
554            let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
555            if !was_sent {
556                Some(
557                    SseEvent::with_type(
558                        "counter_update",
559                        serde_json::json!({
560                            "count": 42,
561                            "status": "active"
562                        }),
563                    )
564                    .with_id("event-123")
565                    .with_retry(5000),
566                )
567            } else {
568                None
569            }
570        }
571    }
572
573    /// Producer that ends immediately (keep-alive test)
574    struct NoEventProducer;
575
576    impl SseEventProducer for NoEventProducer {
577        async fn next_event(&self) -> Option<SseEvent> {
578            None
579        }
580    }
581
582    #[test]
583    fn test_sse_event_creation_minimal() {
584        let event: SseEvent = SseEvent::new(serde_json::json!({"test": "data"}));
585        assert!(event.event_type.is_none());
586        assert!(event.id.is_none());
587        assert!(event.retry.is_none());
588    }
589
590    #[test]
591    fn test_sse_event_with_all_fields() {
592        let event: SseEvent = SseEvent::with_type("update", serde_json::json!({"count": 42}))
593            .with_id("event-001")
594            .with_retry(3000);
595
596        assert_eq!(event.event_type, Some("update".to_string()));
597        assert_eq!(event.id, Some("event-001".to_string()));
598        assert_eq!(event.retry, Some(3000));
599    }
600
601    #[test]
602    fn test_sse_event_builder_pattern() {
603        let event: SseEvent = SseEvent::with_type("notification", serde_json::json!({"text": "hello"}))
604            .with_id("notif-456")
605            .with_retry(5000);
606
607        assert_eq!(event.event_type, Some("notification".to_string()));
608        assert_eq!(event.id, Some("notif-456".to_string()));
609        assert_eq!(event.retry, Some(5000));
610    }
611
612    #[test]
613    fn test_sse_event_multiline_data() {
614        let event: SseEvent = SseEvent::new(serde_json::json!({
615            "text": "line1\nline2\nline3"
616        }));
617
618        assert!(event.data.is_object());
619        let text: Option<&str> = event.data.get("text").and_then(|v| v.as_str());
620        assert_eq!(text, Some("line1\nline2\nline3"));
621    }
622
623    #[test]
624    fn test_sse_event_special_characters() {
625        let event: SseEvent = SseEvent::new(serde_json::json!({
626            "data": "special: \"quotes\", \\ backslash"
627        }));
628
629        assert!(event.data.is_object());
630    }
631
632    #[test]
633    fn test_sse_event_large_payload() {
634        let large_string: String = "x".repeat(100_000);
635        let event: SseEvent = SseEvent::new(serde_json::json!({
636            "payload": large_string.clone()
637        }));
638
639        let payload_field: Option<&str> = event.data.get("payload").and_then(|v| v.as_str());
640        assert_eq!(payload_field.map(|s| s.len()), Some(100_000));
641    }
642
643    #[test]
644    fn test_sse_event_into_axum_event_conversion() {
645        let event: SseEvent = SseEvent::new(serde_json::json!({"msg": "test"}));
646        let _axum_event: axum::response::sse::Event = event.into_axum_event();
647    }
648
649    #[test]
650    fn test_sse_event_into_axum_with_all_fields() {
651        let event: SseEvent = SseEvent::with_type("event", serde_json::json!({"id": 1}))
652            .with_id("123")
653            .with_retry(5000);
654
655        let _axum_event: axum::response::sse::Event = event.into_axum_event();
656    }
657
658    #[test]
659    fn test_sse_state_creation() {
660        let producer: TestProducer = TestProducer {
661            count: AtomicUsize::new(0),
662        };
663        let state: SseState<TestProducer> = SseState::new(producer);
664        let cloned: SseState<TestProducer> = state.clone();
665        assert!(Arc::ptr_eq(&state.producer, &cloned.producer));
666    }
667
668    #[test]
669    fn test_sse_state_with_schema_valid() {
670        let producer: TestProducer = TestProducer {
671            count: AtomicUsize::new(0),
672        };
673        let schema: serde_json::Value = serde_json::json!({
674            "type": "object",
675            "properties": {
676                "message": {"type": "string"}
677            }
678        });
679
680        let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, Some(schema));
681        assert!(result.is_ok());
682    }
683
684    #[test]
685    fn test_sse_state_with_invalid_schema() {
686        let producer: TestProducer = TestProducer {
687            count: AtomicUsize::new(0),
688        };
689        let invalid_schema: serde_json::Value = serde_json::json!({
690            "type": "not-a-valid-type"
691        });
692
693        let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, Some(invalid_schema));
694        assert!(result.is_err());
695    }
696
697    #[test]
698    fn test_sse_state_with_schema_none() {
699        let producer: TestProducer = TestProducer {
700            count: AtomicUsize::new(0),
701        };
702        let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, None);
703        assert!(result.is_ok());
704    }
705
706    #[tokio::test]
707    async fn test_sse_lifecycle_on_connect_called() {
708        let connect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
709        let disconnect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
710
711        let producer: LifecycleProducer =
712            LifecycleProducer::new(Arc::clone(&connect_count), Arc::clone(&disconnect_count));
713
714        producer.on_connect().await;
715        assert_eq!(connect_count.load(Ordering::Relaxed), 1);
716    }
717
718    #[tokio::test]
719    async fn test_sse_lifecycle_on_disconnect_called() {
720        let connect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
721        let disconnect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
722
723        let producer: LifecycleProducer =
724            LifecycleProducer::new(Arc::clone(&connect_count), Arc::clone(&disconnect_count));
725
726        producer.on_disconnect().await;
727        assert_eq!(disconnect_count.load(Ordering::Relaxed), 1);
728    }
729
730    #[tokio::test]
731    async fn test_sse_event_ordering_preserved() {
732        let producer: RapidEventProducer = RapidEventProducer::new(10);
733
734        let mut last_idx: i32 = -1;
735        for _ in 0..10 {
736            if let Some(event) = producer.next_event().await {
737                if let Some(id) = event.data.get("id").and_then(|v| v.as_i64()) {
738                    assert!(id as i32 > last_idx, "Event ordering violated");
739                    last_idx = id as i32;
740                }
741            }
742        }
743    }
744
745    #[tokio::test]
746    async fn test_sse_rapid_event_sending() {
747        let producer: RapidEventProducer = RapidEventProducer::new(100);
748
749        let mut count: usize = 0;
750        loop {
751            match producer.next_event().await {
752                Some(_event) => count += 1,
753                None => break,
754            }
755        }
756
757        assert_eq!(count, 100);
758    }
759
760    #[test]
761    fn test_sse_event_with_empty_data_object() {
762        let event: SseEvent = SseEvent::new(serde_json::json!({}));
763        assert!(event.data.is_object());
764    }
765
766    #[test]
767    fn test_sse_event_with_nested_data() {
768        let event: SseEvent = SseEvent::new(serde_json::json!({
769            "nested": {
770                "deep": {
771                    "value": "found"
772                }
773            }
774        }));
775
776        let deep_value: Option<&str> = event
777            .data
778            .get("nested")
779            .and_then(|v| v.get("deep"))
780            .and_then(|v| v.get("value"))
781            .and_then(|v| v.as_str());
782
783        assert_eq!(deep_value, Some("found"));
784    }
785
786    #[tokio::test]
787    async fn test_sse_producer_stream_ends_cleanly() {
788        let producer: NoEventProducer = NoEventProducer;
789
790        let event1: Option<SseEvent> = producer.next_event().await;
791        assert!(event1.is_none());
792
793        let event2: Option<SseEvent> = producer.next_event().await;
794        assert!(event2.is_none());
795    }
796
797    #[test]
798    fn test_sse_event_clone() {
799        let original: SseEvent = SseEvent::with_type("test", serde_json::json!({"data": "test"}))
800            .with_id("id-1")
801            .with_retry(2000);
802
803        let cloned: SseEvent = original.clone();
804
805        assert_eq!(cloned.event_type, original.event_type);
806        assert_eq!(cloned.id, original.id);
807        assert_eq!(cloned.retry, original.retry);
808        assert_eq!(cloned.data, original.data);
809    }
810
811    #[test]
812    fn test_sse_event_debug_impl() {
813        let event: SseEvent = SseEvent::new(serde_json::json!({"msg": "debug"}));
814        let debug_str: String = format!("{:?}", event);
815        assert!(debug_str.contains("SseEvent"));
816    }
817
818    #[tokio::test]
819    async fn test_sse_multiple_producers_independent() {
820        let producer1: TestProducer = TestProducer {
821            count: AtomicUsize::new(0),
822        };
823        let producer2: TestProducer = TestProducer {
824            count: AtomicUsize::new(0),
825        };
826
827        let _event1: Option<SseEvent> = producer1.next_event().await;
828        let _event2: Option<SseEvent> = producer2.next_event().await;
829
830        let count1: usize = producer1.count.load(Ordering::Relaxed);
831        let count2: usize = producer2.count.load(Ordering::Relaxed);
832
833        assert_eq!(count1, 1);
834        assert_eq!(count2, 1);
835    }
836
837    #[test]
838    fn test_sse_state_cloning_preserves_schema() {
839        let producer: TestProducer = TestProducer {
840            count: AtomicUsize::new(0),
841        };
842        let schema: serde_json::Value = serde_json::json!({
843            "type": "object",
844            "properties": {
845                "message": {"type": "string"}
846            }
847        });
848
849        let state: SseState<TestProducer> =
850            SseState::with_schema(producer, Some(schema)).expect("schema should be valid");
851        let cloned: SseState<TestProducer> = state.clone();
852
853        assert!(Arc::ptr_eq(&state.producer, &cloned.producer));
854        match (&state.event_schema, &cloned.event_schema) {
855            (Some(s1), Some(s2)) => {
856                assert!(Arc::ptr_eq(s1, s2));
857            }
858            _ => panic!("Schema should be preserved in clone"),
859        }
860    }
861
862    #[tokio::test]
863    async fn test_sse_large_payload_integrity() {
864        let producer: LargePayloadProducer = LargePayloadProducer {
865            sent: AtomicBool::new(false),
866        };
867
868        let event: Option<SseEvent> = producer.next_event().await;
869        assert!(event.is_some());
870
871        if let Some(evt) = event {
872            let payload: Option<&str> = evt.data.get("payload").and_then(|v| v.as_str());
873            assert_eq!(payload.map(|s| s.len()), Some(100_000));
874        }
875    }
876
877    #[tokio::test]
878    async fn test_sse_multiline_data_preservation() {
879        let producer: MultilineProducer = MultilineProducer {
880            sent: AtomicBool::new(false),
881        };
882
883        let event: Option<SseEvent> = producer.next_event().await;
884        assert!(event.is_some());
885
886        if let Some(evt) = event {
887            let text: Option<&str> = evt.data.get("text").and_then(|v| v.as_str());
888            assert_eq!(text, Some("line1\nline2\nline3"));
889        }
890    }
891
892    #[tokio::test]
893    async fn test_sse_special_chars_in_payload() {
894        let producer: SpecialCharsProducer = SpecialCharsProducer {
895            sent: AtomicBool::new(false),
896        };
897
898        let event: Option<SseEvent> = producer.next_event().await;
899        assert!(event.is_some());
900
901        if let Some(evt) = event {
902            let data: Option<&str> = evt.data.get("data").and_then(|v| v.as_str());
903            assert!(data.is_some());
904            assert!(data.unwrap().contains("quotes"));
905        }
906    }
907
908    #[tokio::test]
909    async fn test_sse_full_event_fields_together() {
910        let producer: FullFieldProducer = FullFieldProducer {
911            sent: AtomicBool::new(false),
912        };
913
914        let event: Option<SseEvent> = producer.next_event().await;
915        assert!(event.is_some());
916
917        if let Some(evt) = event {
918            assert_eq!(evt.event_type, Some("counter_update".to_string()));
919            assert_eq!(evt.id, Some("event-123".to_string()));
920            assert_eq!(evt.retry, Some(5000));
921            assert_eq!(evt.data.get("count").and_then(|v| v.as_i64()), Some(42));
922        }
923    }
924
925    #[test]
926    fn test_sse_event_to_axum_preserves_data() {
927        let event = SseEvent::new(serde_json::json!({"key": "value"}));
928        let _axum_event: axum::response::sse::Event = event.into_axum_event();
929    }
930
931    #[test]
932    fn test_sse_event_data_only_no_metadata() {
933        let event = SseEvent::new(serde_json::json!({"message": "hello"}));
934        assert!(event.event_type.is_none(), "event_type should be None");
935        assert!(event.id.is_none(), "id should be None");
936        assert!(event.retry.is_none(), "retry should be None");
937
938        let _axum_event: axum::response::sse::Event = event.into_axum_event();
939    }
940
941    #[test]
942    fn test_sse_event_with_all_fields_filled() {
943        let event = SseEvent::with_type("update", serde_json::json!({"status": "ok"}))
944            .with_id("evt-999")
945            .with_retry(10000);
946
947        assert_eq!(event.event_type.as_ref(), Some(&"update".to_string()));
948        assert_eq!(event.id.as_ref(), Some(&"evt-999".to_string()));
949        assert_eq!(event.retry, Some(10000));
950
951        let _axum_event: axum::response::sse::Event = event.into_axum_event();
952    }
953
954    #[test]
955    fn test_sse_event_empty_data_field() {
956        let event = SseEvent::new(serde_json::json!({}));
957        assert!(event.data.is_object());
958        assert_eq!(event.data.as_object().unwrap().len(), 0);
959
960        let _axum_event: axum::response::sse::Event = event.into_axum_event();
961    }
962
963    #[test]
964    fn test_sse_event_data_with_newlines_in_string() {
965        let multiline_data = "first line\nsecond line\nthird line";
966        let event = SseEvent::new(serde_json::json!({"text": multiline_data}));
967
968        let stored_text = event.data.get("text").and_then(|v| v.as_str());
969        assert_eq!(stored_text, Some(multiline_data));
970
971        let _axum_event: axum::response::sse::Event = event.into_axum_event();
972    }
973
974    #[test]
975    fn test_sse_event_data_with_colons() {
976        let data_with_colons = "key1: value1, key2: value2";
977        let event = SseEvent::new(serde_json::json!({"data": data_with_colons}));
978
979        let stored_data = event.data.get("data").and_then(|v| v.as_str());
980        assert_eq!(stored_data, Some(data_with_colons));
981
982        let _axum_event: axum::response::sse::Event = event.into_axum_event();
983    }
984
985    #[test]
986    fn test_sse_event_comment_only_structure() {
987        let event = SseEvent::new(serde_json::json!({"comment": "this is a comment"}));
988        let _axum_event: axum::response::sse::Event = event.into_axum_event();
989    }
990
991    #[test]
992    fn test_sse_event_type_with_spaces() {
993        let event = SseEvent::with_type("event type with spaces", serde_json::json!({"data": "test"}));
994        assert_eq!(event.event_type, Some("event type with spaces".to_string()));
995
996        let _axum_event: axum::response::sse::Event = event.into_axum_event();
997    }
998
999    #[test]
1000    fn test_sse_event_type_with_special_chars() {
1001        let event_types = vec!["update-v2", "event_123", "message.sent", "type-with-dash"];
1002
1003        for event_type in event_types {
1004            let event = SseEvent::with_type(event_type, serde_json::json!({"data": "test"}));
1005            assert_eq!(event.event_type.as_ref(), Some(&event_type.to_string()));
1006
1007            let _axum_event: axum::response::sse::Event = event.into_axum_event();
1008        }
1009    }
1010
1011    #[test]
1012    fn test_sse_event_id_alphanumeric() {
1013        let ids = vec!["123", "abc-def", "event_001", "id-with-dashes-123"];
1014
1015        for id in ids {
1016            let event = SseEvent::new(serde_json::json!({"data": "test"})).with_id(id);
1017            assert_eq!(event.id.as_ref(), Some(&id.to_string()));
1018
1019            let _axum_event: axum::response::sse::Event = event.into_axum_event();
1020        }
1021    }
1022
1023    #[test]
1024    fn test_sse_event_retry_zero() {
1025        let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(0);
1026        assert_eq!(event.retry, Some(0));
1027
1028        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1029    }
1030
1031    #[test]
1032    fn test_sse_event_retry_small_value() {
1033        let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(100);
1034        assert_eq!(event.retry, Some(100));
1035
1036        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1037    }
1038
1039    #[test]
1040    fn test_sse_event_retry_large_value() {
1041        let large_retry = u64::MAX / 2;
1042        let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(large_retry);
1043        assert_eq!(event.retry, Some(large_retry));
1044
1045        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1046    }
1047
1048    #[test]
1049    fn test_sse_event_retry_typical_values() {
1050        let typical_retries = vec![1000, 3000, 5000, 10000, 30000];
1051
1052        for retry_ms in typical_retries {
1053            let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(retry_ms);
1054            assert_eq!(event.retry, Some(retry_ms));
1055
1056            let _axum_event: axum::response::sse::Event = event.into_axum_event();
1057        }
1058    }
1059
1060    #[test]
1061    fn test_sse_event_utf8_emoji_in_data() {
1062        let emoji_data = "Hello 👋 World 🌍";
1063        let event = SseEvent::new(serde_json::json!({"text": emoji_data}));
1064
1065        let stored = event.data.get("text").and_then(|v| v.as_str());
1066        assert_eq!(stored, Some(emoji_data));
1067
1068        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1069    }
1070
1071    #[test]
1072    fn test_sse_event_utf8_chinese_characters() {
1073        let chinese_text = "你好世界";
1074        let event = SseEvent::new(serde_json::json!({"text": chinese_text}));
1075
1076        let stored = event.data.get("text").and_then(|v| v.as_str());
1077        assert_eq!(stored, Some(chinese_text));
1078
1079        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1080    }
1081
1082    #[test]
1083    fn test_sse_event_utf8_arabic_characters() {
1084        let arabic_text = "مرحبا بالعالم";
1085        let event = SseEvent::new(serde_json::json!({"text": arabic_text}));
1086
1087        let stored = event.data.get("text").and_then(|v| v.as_str());
1088        assert_eq!(stored, Some(arabic_text));
1089
1090        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1091    }
1092
1093    #[test]
1094    fn test_sse_event_utf8_mixed_scripts() {
1095        let mixed = "Hello 你好 مرحبا 👋";
1096        let event = SseEvent::new(serde_json::json!({"text": mixed}));
1097
1098        let stored = event.data.get("text").and_then(|v| v.as_str());
1099        assert_eq!(stored, Some(mixed));
1100
1101        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1102    }
1103
1104    #[test]
1105    fn test_sse_event_json_serialization_produces_valid_utf8() {
1106        let event = SseEvent::new(serde_json::json!({"text": "test"}));
1107        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1108    }
1109
1110    #[test]
1111    fn test_sse_event_64kb_payload() {
1112        let large_data = "x".repeat(65536);
1113        let event = SseEvent::new(serde_json::json!({"payload": large_data.clone()}));
1114
1115        let stored = event.data.get("payload").and_then(|v| v.as_str());
1116        assert_eq!(stored.map(|s| s.len()), Some(65536));
1117
1118        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1119    }
1120
1121    #[test]
1122    fn test_sse_event_1mb_payload() {
1123        let large_data = "y".repeat(1_000_000);
1124        let event = SseEvent::new(serde_json::json!({"payload": large_data.clone()}));
1125
1126        let stored = event.data.get("payload").and_then(|v| v.as_str());
1127        assert_eq!(stored.map(|s| s.len()), Some(1_000_000));
1128
1129        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1130    }
1131
1132    #[test]
1133    fn test_sse_event_deeply_nested_json() {
1134        let deeply_nested = serde_json::json!({
1135            "level1": {
1136                "level2": {
1137                    "level3": {
1138                        "level4": {
1139                            "level5": {
1140                                "level6": {
1141                                    "level7": {
1142                                        "value": "deep"
1143                                    }
1144                                }
1145                            }
1146                        }
1147                    }
1148                }
1149            }
1150        });
1151
1152        let event = SseEvent::new(deeply_nested);
1153        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1154    }
1155
1156    #[test]
1157    fn test_sse_event_array_in_data() {
1158        let event = SseEvent::new(serde_json::json!({
1159            "items": [1, 2, 3, 4, 5]
1160        }));
1161
1162        let items = event.data.get("items").and_then(|v| v.as_array());
1163        assert!(items.is_some());
1164        assert_eq!(items.unwrap().len(), 5);
1165
1166        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1167    }
1168
1169    #[test]
1170    fn test_sse_event_null_value_in_data() {
1171        let event = SseEvent::new(serde_json::json!({
1172            "nullable": null
1173        }));
1174
1175        assert!(event.data.get("nullable").unwrap().is_null());
1176
1177        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1178    }
1179
1180    #[test]
1181    fn test_sse_event_boolean_values() {
1182        let event = SseEvent::new(serde_json::json!({
1183            "active": true,
1184            "deleted": false
1185        }));
1186
1187        assert_eq!(event.data.get("active").and_then(|v| v.as_bool()), Some(true));
1188        assert_eq!(event.data.get("deleted").and_then(|v| v.as_bool()), Some(false));
1189
1190        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1191    }
1192
1193    #[tokio::test]
1194    async fn test_sse_last_event_id_header_simulation() {
1195        let producer = RapidEventProducer::new(5);
1196
1197        let mut events = Vec::new();
1198        for _ in 0..5 {
1199            if let Some(evt) = producer.next_event().await {
1200                events.push(evt);
1201            }
1202        }
1203
1204        assert_eq!(events.len(), 5);
1205    }
1206
1207    #[tokio::test]
1208    async fn test_sse_retry_timeout_specification() {
1209        let producer = FullFieldProducer {
1210            sent: AtomicBool::new(false),
1211        };
1212
1213        let event = producer.next_event().await;
1214        assert!(event.is_some());
1215
1216        if let Some(evt) = event {
1217            assert_eq!(evt.retry, Some(5000), "Retry should be 5000ms");
1218        }
1219    }
1220
1221    #[test]
1222    fn test_sse_event_builder_method_chaining() {
1223        let event = SseEvent::new(serde_json::json!({"data": "test"}))
1224            .with_id("id-1")
1225            .with_retry(3000);
1226
1227        assert_eq!(event.id, Some("id-1".to_string()));
1228        assert_eq!(event.retry, Some(3000));
1229
1230        let event2 = SseEvent::with_type("msg", serde_json::json!({"x": 1}))
1231            .with_id("id-2")
1232            .with_retry(5000);
1233
1234        assert_eq!(event2.event_type, Some("msg".to_string()));
1235        assert_eq!(event2.id, Some("id-2".to_string()));
1236        assert_eq!(event2.retry, Some(5000));
1237    }
1238
1239    #[test]
1240    fn test_sse_event_overwriting_fields() {
1241        let event = SseEvent::new(serde_json::json!({"v": 1}))
1242            .with_id("id-original")
1243            .with_retry(1000);
1244
1245        assert_eq!(event.id, Some("id-original".to_string()));
1246        assert_eq!(event.retry, Some(1000));
1247    }
1248
1249    #[test]
1250    fn test_sse_event_type_empty_string() {
1251        let event = SseEvent::with_type("", serde_json::json!({"data": "test"}));
1252        assert_eq!(event.event_type, Some("".to_string()));
1253
1254        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1255    }
1256
1257    #[test]
1258    fn test_sse_event_id_empty_string() {
1259        let event = SseEvent::new(serde_json::json!({"data": "test"})).with_id("");
1260        assert_eq!(event.id, Some("".to_string()));
1261
1262        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1263    }
1264
1265    #[tokio::test]
1266    async fn test_sse_event_sequence_maintains_order() {
1267        let producer = RapidEventProducer::new(10);
1268
1269        let mut event_ids = Vec::new();
1270        for _ in 0..10 {
1271            if let Some(evt) = producer.next_event().await {
1272                if let Some(id) = evt.data.get("id").and_then(|v| v.as_i64()) {
1273                    event_ids.push(id);
1274                }
1275            }
1276        }
1277
1278        for i in 0..event_ids.len() {
1279            assert_eq!(event_ids[i], i as i64, "Event order should match insertion order");
1280        }
1281    }
1282
1283    #[tokio::test]
1284    async fn test_sse_rapid_events_no_loss() {
1285        let producer = RapidEventProducer::new(50);
1286
1287        let mut count = 0;
1288        loop {
1289            match producer.next_event().await {
1290                Some(_) => count += 1,
1291                None => break,
1292            }
1293        }
1294
1295        assert_eq!(count, 50, "All events should be produced without loss");
1296    }
1297
1298    #[tokio::test]
1299    async fn test_sse_event_batching_simulation() {
1300        let producer = RapidEventProducer::new(20);
1301
1302        let mut batch_size = 0;
1303        let mut batch_count = 0;
1304
1305        loop {
1306            match producer.next_event().await {
1307                Some(_evt) => {
1308                    batch_size += 1;
1309                    if batch_size >= 5 {
1310                        batch_count += 1;
1311                        batch_size = 0;
1312                    }
1313                }
1314                None => {
1315                    if batch_size > 0 {
1316                        batch_count += 1;
1317                    }
1318                    break;
1319                }
1320            }
1321        }
1322
1323        assert!(batch_count >= 4, "Should have processed at least 4 batches");
1324    }
1325
1326    #[test]
1327    fn test_sse_state_arc_sharing() {
1328        let producer = TestProducer {
1329            count: AtomicUsize::new(0),
1330        };
1331        let state1 = SseState::new(producer);
1332        let state2 = state1.clone();
1333        let state3 = state2.clone();
1334
1335        assert!(Arc::ptr_eq(&state1.producer, &state2.producer));
1336        assert!(Arc::ptr_eq(&state2.producer, &state3.producer));
1337    }
1338
1339    #[test]
1340    fn test_sse_state_schema_arc_sharing() {
1341        let producer = TestProducer {
1342            count: AtomicUsize::new(0),
1343        };
1344        let schema = serde_json::json!({
1345            "type": "object"
1346        });
1347
1348        let state1 = SseState::with_schema(producer, Some(schema)).expect("schema should be valid");
1349        let state2 = state1.clone();
1350
1351        match (&state1.event_schema, &state2.event_schema) {
1352            (Some(s1), Some(s2)) => {
1353                assert!(Arc::ptr_eq(s1, s2));
1354            }
1355            _ => panic!("Both states should have schema"),
1356        }
1357    }
1358
1359    #[test]
1360    fn test_sse_event_into_axum_event_numeric_data() {
1361        let event = SseEvent::new(serde_json::json!({
1362            "count": 42,
1363            "temperature": 98.6,
1364            "negative": -273
1365        }));
1366
1367        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1368    }
1369
1370    #[test]
1371    fn test_sse_event_json_number_precision() {
1372        let event = SseEvent::new(serde_json::json!({
1373            "float": 3.14159265359,
1374            "large_int": 9007199254740991i64
1375        }));
1376
1377        assert_eq!(event.data.get("float").and_then(|v| v.as_f64()), Some(3.14159265359));
1378
1379        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1380    }
1381
1382    #[test]
1383    fn test_sse_event_string_escaping() {
1384        let event = SseEvent::new(serde_json::json!({
1385            "escaped": "line1\nline2\ttab",
1386            "quotes": "He said \"hello\"",
1387            "backslash": "path\\to\\file"
1388        }));
1389
1390        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1391    }
1392
1393    #[test]
1394    fn test_sse_event_all_json_types_combined() {
1395        let event = SseEvent::new(serde_json::json!({
1396            "string": "text",
1397            "number": 123,
1398            "float": 1.5,
1399            "boolean": true,
1400            "null_value": null,
1401            "array": [1, 2, 3],
1402            "object": {
1403                "nested": "value"
1404            }
1405        }));
1406
1407        let _axum_event: axum::response::sse::Event = event.into_axum_event();
1408    }
1409}