1use axum::{
6 extract::State,
7 response::{
8 IntoResponse,
9 sse::{Event, KeepAlive, Sse},
10 },
11};
12use futures_util::stream;
13use std::{convert::Infallible, sync::Arc, time::Duration};
14use tracing::{debug, error, info};
15
16pub trait SseEventProducer: Send + Sync {
67 fn next_event(&self) -> impl std::future::Future<Output = Option<SseEvent>> + Send;
76
77 fn on_connect(&self) -> impl std::future::Future<Output = ()> + Send {
82 async {}
83 }
84
85 fn on_disconnect(&self) -> impl std::future::Future<Output = ()> + Send {
90 async {}
91 }
92}
93
94#[derive(Debug, Clone, serde::Serialize, serde::Deserialize)]
116pub struct SseEvent {
117 pub event_type: Option<String>,
119 pub data: serde_json::Value,
121 pub id: Option<String>,
123 pub retry: Option<u64>,
125}
126
127impl SseEvent {
128 pub fn new(data: serde_json::Value) -> Self {
145 Self {
146 event_type: None,
147 data,
148 id: None,
149 retry: None,
150 }
151 }
152
153 pub fn with_type(event_type: impl Into<String>, data: serde_json::Value) -> Self {
172 Self {
173 event_type: Some(event_type.into()),
174 data,
175 id: None,
176 retry: None,
177 }
178 }
179
180 pub fn with_id(mut self, id: impl Into<String>) -> Self {
198 self.id = Some(id.into());
199 self
200 }
201
202 pub fn with_retry(mut self, retry_ms: u64) -> Self {
220 self.retry = Some(retry_ms);
221 self
222 }
223
224 fn into_axum_event(self) -> Event {
226 let json_data = match serde_json::to_string(&self.data) {
227 Ok(json) => json,
228 Err(e) => {
229 error!("Failed to serialize SSE event data: {}", e);
230 "null".to_string()
231 }
232 };
233
234 let mut event = Event::default().data(json_data);
235
236 if let Some(event_type) = self.event_type {
237 event = event.event(event_type);
238 }
239
240 if let Some(id) = self.id {
241 event = event.id(id);
242 }
243
244 if let Some(retry) = self.retry {
245 event = event.retry(Duration::from_millis(retry));
246 }
247
248 event
249 }
250}
251
252pub struct SseState<P: SseEventProducer> {
257 producer: Arc<P>,
259 event_schema: Option<Arc<jsonschema::Validator>>,
261}
262
263impl<P: SseEventProducer> Clone for SseState<P> {
264 fn clone(&self) -> Self {
265 Self {
266 producer: Arc::clone(&self.producer),
267 event_schema: self.event_schema.clone(),
268 }
269 }
270}
271
272impl<P: SseEventProducer + 'static> SseState<P> {
273 pub fn new(producer: P) -> Self {
287 Self {
288 producer: Arc::new(producer),
289 event_schema: None,
290 }
291 }
292
293 pub fn with_schema(producer: P, event_schema: Option<serde_json::Value>) -> Result<Self, String> {
321 let event_validator = if let Some(schema) = event_schema {
322 Some(Arc::new(
323 jsonschema::validator_for(&schema).map_err(|e| format!("Invalid event schema: {}", e))?,
324 ))
325 } else {
326 None
327 };
328
329 Ok(Self {
330 producer: Arc::new(producer),
331 event_schema: event_validator,
332 })
333 }
334}
335
336pub async fn sse_handler<P: SseEventProducer + 'static>(State(state): State<SseState<P>>) -> impl IntoResponse {
365 info!("SSE client connected");
366
367 state.producer.on_connect().await;
368
369 let producer = Arc::clone(&state.producer);
370 let event_schema = state.event_schema.clone();
371 let stream = stream::unfold((producer, event_schema), |(producer, event_schema)| async move {
372 match producer.next_event().await {
373 Some(sse_event) => {
374 debug!("Sending SSE event: {:?}", sse_event.event_type);
375
376 if let Some(validator) = &event_schema
377 && !validator.is_valid(&sse_event.data)
378 {
379 error!("SSE event validation failed");
380 return Some((
381 Ok::<_, Infallible>(Event::default().data("validation_error")),
382 (producer, event_schema),
383 ));
384 }
385
386 let event = sse_event.into_axum_event();
387 Some((Ok::<_, Infallible>(event), (producer, event_schema)))
388 }
389 None => {
390 info!("SSE stream ended");
391 producer.on_disconnect().await;
392 None
393 }
394 }
395 });
396
397 let sse_response =
398 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)).text("keep-alive"));
399
400 sse_response.into_response()
401}
402
403#[cfg(test)]
404mod tests {
405 use super::*;
406 use std::sync::Arc;
407 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
408
409 struct TestProducer {
410 count: AtomicUsize,
411 }
412
413 impl SseEventProducer for TestProducer {
414 async fn next_event(&self) -> Option<SseEvent> {
415 let count = self.count.fetch_add(1, Ordering::Relaxed);
416 if count < 3 {
417 Some(SseEvent::new(serde_json::json!({
418 "message": format!("Event {}", count)
419 })))
420 } else {
421 None
422 }
423 }
424 }
425
426 struct LifecycleProducer {
428 connect_count: Arc<AtomicUsize>,
429 disconnect_count: Arc<AtomicUsize>,
430 event_count: AtomicUsize,
431 }
432
433 impl LifecycleProducer {
434 fn new(connect: Arc<AtomicUsize>, disconnect: Arc<AtomicUsize>) -> Self {
435 Self {
436 connect_count: connect,
437 disconnect_count: disconnect,
438 event_count: AtomicUsize::new(0),
439 }
440 }
441 }
442
443 impl SseEventProducer for LifecycleProducer {
444 async fn next_event(&self) -> Option<SseEvent> {
445 let idx: usize = self.event_count.fetch_add(1, Ordering::Relaxed);
446 if idx < 2 {
447 Some(SseEvent::new(serde_json::json!({"event": idx})))
448 } else {
449 None
450 }
451 }
452
453 async fn on_connect(&self) {
454 self.connect_count.fetch_add(1, Ordering::Relaxed);
455 }
456
457 async fn on_disconnect(&self) {
458 self.disconnect_count.fetch_add(1, Ordering::Relaxed);
459 }
460 }
461
462 struct MultilineProducer {
464 sent: AtomicBool,
465 }
466
467 impl SseEventProducer for MultilineProducer {
468 async fn next_event(&self) -> Option<SseEvent> {
469 let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
470 if !was_sent {
471 Some(SseEvent::new(serde_json::json!({
472 "text": "line1\nline2\nline3"
473 })))
474 } else {
475 None
476 }
477 }
478 }
479
480 struct SpecialCharsProducer {
482 sent: AtomicBool,
483 }
484
485 impl SseEventProducer for SpecialCharsProducer {
486 async fn next_event(&self) -> Option<SseEvent> {
487 let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
488 if !was_sent {
489 Some(SseEvent::new(serde_json::json!({
490 "data": "special: \"quotes\", \\ backslash, \t tab, \r\n crlf"
491 })))
492 } else {
493 None
494 }
495 }
496 }
497
498 struct LargePayloadProducer {
500 sent: AtomicBool,
501 }
502
503 impl SseEventProducer for LargePayloadProducer {
504 async fn next_event(&self) -> Option<SseEvent> {
505 let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
506 if !was_sent {
507 let large_string: String = "x".repeat(100_000);
508 Some(SseEvent::new(serde_json::json!({
509 "payload": large_string
510 })))
511 } else {
512 None
513 }
514 }
515 }
516
517 struct RapidEventProducer {
519 event_count: usize,
520 current: AtomicUsize,
521 }
522
523 impl RapidEventProducer {
524 fn new(count: usize) -> Self {
525 Self {
526 event_count: count,
527 current: AtomicUsize::new(0),
528 }
529 }
530 }
531
532 impl SseEventProducer for RapidEventProducer {
533 async fn next_event(&self) -> Option<SseEvent> {
534 let idx: usize = self.current.fetch_add(1, Ordering::Relaxed);
535 if idx < self.event_count {
536 Some(SseEvent::new(serde_json::json!({
537 "id": idx,
538 "data": format!("event_{}", idx)
539 })))
540 } else {
541 None
542 }
543 }
544 }
545
546 struct FullFieldProducer {
548 sent: AtomicBool,
549 }
550
551 impl SseEventProducer for FullFieldProducer {
552 async fn next_event(&self) -> Option<SseEvent> {
553 let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
554 if !was_sent {
555 Some(
556 SseEvent::with_type(
557 "counter_update",
558 serde_json::json!({
559 "count": 42,
560 "status": "active"
561 }),
562 )
563 .with_id("event-123")
564 .with_retry(5000),
565 )
566 } else {
567 None
568 }
569 }
570 }
571
572 struct NoEventProducer;
574
575 impl SseEventProducer for NoEventProducer {
576 async fn next_event(&self) -> Option<SseEvent> {
577 None
578 }
579 }
580
581 #[test]
582 fn test_sse_event_creation_minimal() {
583 let event: SseEvent = SseEvent::new(serde_json::json!({"test": "data"}));
584 assert!(event.event_type.is_none());
585 assert!(event.id.is_none());
586 assert!(event.retry.is_none());
587 }
588
589 #[test]
590 fn test_sse_event_with_all_fields() {
591 let event: SseEvent = SseEvent::with_type("update", serde_json::json!({"count": 42}))
592 .with_id("event-001")
593 .with_retry(3000);
594
595 assert_eq!(event.event_type, Some("update".to_string()));
596 assert_eq!(event.id, Some("event-001".to_string()));
597 assert_eq!(event.retry, Some(3000));
598 }
599
600 #[test]
601 fn test_sse_event_builder_pattern() {
602 let event: SseEvent = SseEvent::with_type("notification", serde_json::json!({"text": "hello"}))
603 .with_id("notif-456")
604 .with_retry(5000);
605
606 assert_eq!(event.event_type, Some("notification".to_string()));
607 assert_eq!(event.id, Some("notif-456".to_string()));
608 assert_eq!(event.retry, Some(5000));
609 }
610
611 #[test]
612 fn test_sse_event_multiline_data() {
613 let event: SseEvent = SseEvent::new(serde_json::json!({
614 "text": "line1\nline2\nline3"
615 }));
616
617 assert!(event.data.is_object());
618 let text: Option<&str> = event.data.get("text").and_then(|v| v.as_str());
619 assert_eq!(text, Some("line1\nline2\nline3"));
620 }
621
622 #[test]
623 fn test_sse_event_special_characters() {
624 let event: SseEvent = SseEvent::new(serde_json::json!({
625 "data": "special: \"quotes\", \\ backslash"
626 }));
627
628 assert!(event.data.is_object());
629 }
630
631 #[test]
632 fn test_sse_event_large_payload() {
633 let large_string: String = "x".repeat(100_000);
634 let event: SseEvent = SseEvent::new(serde_json::json!({
635 "payload": large_string.clone()
636 }));
637
638 let payload_field: Option<&str> = event.data.get("payload").and_then(|v| v.as_str());
639 assert_eq!(payload_field.map(|s| s.len()), Some(100_000));
640 }
641
642 #[test]
643 fn test_sse_event_into_axum_event_conversion() {
644 let event: SseEvent = SseEvent::new(serde_json::json!({"msg": "test"}));
645 let _axum_event: axum::response::sse::Event = event.into_axum_event();
646 }
647
648 #[test]
649 fn test_sse_event_into_axum_with_all_fields() {
650 let event: SseEvent = SseEvent::with_type("event", serde_json::json!({"id": 1}))
651 .with_id("123")
652 .with_retry(5000);
653
654 let _axum_event: axum::response::sse::Event = event.into_axum_event();
655 }
656
657 #[test]
658 fn test_sse_state_creation() {
659 let producer: TestProducer = TestProducer {
660 count: AtomicUsize::new(0),
661 };
662 let state: SseState<TestProducer> = SseState::new(producer);
663 let cloned: SseState<TestProducer> = state.clone();
664 assert!(Arc::ptr_eq(&state.producer, &cloned.producer));
665 }
666
667 #[test]
668 fn test_sse_state_with_schema_valid() {
669 let producer: TestProducer = TestProducer {
670 count: AtomicUsize::new(0),
671 };
672 let schema: serde_json::Value = serde_json::json!({
673 "type": "object",
674 "properties": {
675 "message": {"type": "string"}
676 }
677 });
678
679 let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, Some(schema));
680 assert!(result.is_ok());
681 }
682
683 #[test]
684 fn test_sse_state_with_invalid_schema() {
685 let producer: TestProducer = TestProducer {
686 count: AtomicUsize::new(0),
687 };
688 let invalid_schema: serde_json::Value = serde_json::json!({
689 "type": "not-a-valid-type"
690 });
691
692 let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, Some(invalid_schema));
693 assert!(result.is_err());
694 }
695
696 #[test]
697 fn test_sse_state_with_schema_none() {
698 let producer: TestProducer = TestProducer {
699 count: AtomicUsize::new(0),
700 };
701 let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, None);
702 assert!(result.is_ok());
703 }
704
705 #[tokio::test]
706 async fn test_sse_lifecycle_on_connect_called() {
707 let connect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
708 let disconnect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
709
710 let producer: LifecycleProducer =
711 LifecycleProducer::new(Arc::clone(&connect_count), Arc::clone(&disconnect_count));
712
713 producer.on_connect().await;
714 assert_eq!(connect_count.load(Ordering::Relaxed), 1);
715 }
716
717 #[tokio::test]
718 async fn test_sse_lifecycle_on_disconnect_called() {
719 let connect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
720 let disconnect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
721
722 let producer: LifecycleProducer =
723 LifecycleProducer::new(Arc::clone(&connect_count), Arc::clone(&disconnect_count));
724
725 producer.on_disconnect().await;
726 assert_eq!(disconnect_count.load(Ordering::Relaxed), 1);
727 }
728
729 #[tokio::test]
730 async fn test_sse_event_ordering_preserved() {
731 let producer: RapidEventProducer = RapidEventProducer::new(10);
732
733 let mut last_idx: i32 = -1;
734 for _ in 0..10 {
735 if let Some(event) = producer.next_event().await {
736 if let Some(id) = event.data.get("id").and_then(|v| v.as_i64()) {
737 assert!(id as i32 > last_idx, "Event ordering violated");
738 last_idx = id as i32;
739 }
740 }
741 }
742 }
743
744 #[tokio::test]
745 async fn test_sse_rapid_event_sending() {
746 let producer: RapidEventProducer = RapidEventProducer::new(100);
747
748 let mut count: usize = 0;
749 loop {
750 match producer.next_event().await {
751 Some(_event) => count += 1,
752 None => break,
753 }
754 }
755
756 assert_eq!(count, 100);
757 }
758
759 #[test]
760 fn test_sse_event_with_empty_data_object() {
761 let event: SseEvent = SseEvent::new(serde_json::json!({}));
762 assert!(event.data.is_object());
763 }
764
765 #[test]
766 fn test_sse_event_with_nested_data() {
767 let event: SseEvent = SseEvent::new(serde_json::json!({
768 "nested": {
769 "deep": {
770 "value": "found"
771 }
772 }
773 }));
774
775 let deep_value: Option<&str> = event
776 .data
777 .get("nested")
778 .and_then(|v| v.get("deep"))
779 .and_then(|v| v.get("value"))
780 .and_then(|v| v.as_str());
781
782 assert_eq!(deep_value, Some("found"));
783 }
784
785 #[tokio::test]
786 async fn test_sse_producer_stream_ends_cleanly() {
787 let producer: NoEventProducer = NoEventProducer;
788
789 let event1: Option<SseEvent> = producer.next_event().await;
790 assert!(event1.is_none());
791
792 let event2: Option<SseEvent> = producer.next_event().await;
793 assert!(event2.is_none());
794 }
795
796 #[test]
797 fn test_sse_event_clone() {
798 let original: SseEvent = SseEvent::with_type("test", serde_json::json!({"data": "test"}))
799 .with_id("id-1")
800 .with_retry(2000);
801
802 let cloned: SseEvent = original.clone();
803
804 assert_eq!(cloned.event_type, original.event_type);
805 assert_eq!(cloned.id, original.id);
806 assert_eq!(cloned.retry, original.retry);
807 assert_eq!(cloned.data, original.data);
808 }
809
810 #[test]
811 fn test_sse_event_debug_impl() {
812 let event: SseEvent = SseEvent::new(serde_json::json!({"msg": "debug"}));
813 let debug_str: String = format!("{:?}", event);
814 assert!(debug_str.contains("SseEvent"));
815 }
816
817 #[tokio::test]
818 async fn test_sse_multiple_producers_independent() {
819 let producer1: TestProducer = TestProducer {
820 count: AtomicUsize::new(0),
821 };
822 let producer2: TestProducer = TestProducer {
823 count: AtomicUsize::new(0),
824 };
825
826 let _event1: Option<SseEvent> = producer1.next_event().await;
827 let _event2: Option<SseEvent> = producer2.next_event().await;
828
829 let count1: usize = producer1.count.load(Ordering::Relaxed);
830 let count2: usize = producer2.count.load(Ordering::Relaxed);
831
832 assert_eq!(count1, 1);
833 assert_eq!(count2, 1);
834 }
835
836 #[test]
837 fn test_sse_state_cloning_preserves_schema() {
838 let producer: TestProducer = TestProducer {
839 count: AtomicUsize::new(0),
840 };
841 let schema: serde_json::Value = serde_json::json!({
842 "type": "object",
843 "properties": {
844 "message": {"type": "string"}
845 }
846 });
847
848 let state: SseState<TestProducer> =
849 SseState::with_schema(producer, Some(schema)).expect("schema should be valid");
850 let cloned: SseState<TestProducer> = state.clone();
851
852 assert!(Arc::ptr_eq(&state.producer, &cloned.producer));
853 match (&state.event_schema, &cloned.event_schema) {
854 (Some(s1), Some(s2)) => {
855 assert!(Arc::ptr_eq(s1, s2));
856 }
857 _ => panic!("Schema should be preserved in clone"),
858 }
859 }
860
861 #[tokio::test]
862 async fn test_sse_large_payload_integrity() {
863 let producer: LargePayloadProducer = LargePayloadProducer {
864 sent: AtomicBool::new(false),
865 };
866
867 let event: Option<SseEvent> = producer.next_event().await;
868 assert!(event.is_some());
869
870 if let Some(evt) = event {
871 let payload: Option<&str> = evt.data.get("payload").and_then(|v| v.as_str());
872 assert_eq!(payload.map(|s| s.len()), Some(100_000));
873 }
874 }
875
876 #[tokio::test]
877 async fn test_sse_multiline_data_preservation() {
878 let producer: MultilineProducer = MultilineProducer {
879 sent: AtomicBool::new(false),
880 };
881
882 let event: Option<SseEvent> = producer.next_event().await;
883 assert!(event.is_some());
884
885 if let Some(evt) = event {
886 let text: Option<&str> = evt.data.get("text").and_then(|v| v.as_str());
887 assert_eq!(text, Some("line1\nline2\nline3"));
888 }
889 }
890
891 #[tokio::test]
892 async fn test_sse_special_chars_in_payload() {
893 let producer: SpecialCharsProducer = SpecialCharsProducer {
894 sent: AtomicBool::new(false),
895 };
896
897 let event: Option<SseEvent> = producer.next_event().await;
898 assert!(event.is_some());
899
900 if let Some(evt) = event {
901 let data: Option<&str> = evt.data.get("data").and_then(|v| v.as_str());
902 assert!(data.is_some());
903 assert!(data.unwrap().contains("quotes"));
904 }
905 }
906
907 #[tokio::test]
908 async fn test_sse_full_event_fields_together() {
909 let producer: FullFieldProducer = FullFieldProducer {
910 sent: AtomicBool::new(false),
911 };
912
913 let event: Option<SseEvent> = producer.next_event().await;
914 assert!(event.is_some());
915
916 if let Some(evt) = event {
917 assert_eq!(evt.event_type, Some("counter_update".to_string()));
918 assert_eq!(evt.id, Some("event-123".to_string()));
919 assert_eq!(evt.retry, Some(5000));
920 assert_eq!(evt.data.get("count").and_then(|v| v.as_i64()), Some(42));
921 }
922 }
923
924 #[test]
925 fn test_sse_event_to_axum_preserves_data() {
926 let event = SseEvent::new(serde_json::json!({"key": "value"}));
927 let _axum_event: axum::response::sse::Event = event.into_axum_event();
928 }
929
930 #[test]
931 fn test_sse_event_data_only_no_metadata() {
932 let event = SseEvent::new(serde_json::json!({"message": "hello"}));
933 assert!(event.event_type.is_none(), "event_type should be None");
934 assert!(event.id.is_none(), "id should be None");
935 assert!(event.retry.is_none(), "retry should be None");
936
937 let _axum_event: axum::response::sse::Event = event.into_axum_event();
938 }
939
940 #[test]
941 fn test_sse_event_with_all_fields_filled() {
942 let event = SseEvent::with_type("update", serde_json::json!({"status": "ok"}))
943 .with_id("evt-999")
944 .with_retry(10000);
945
946 assert_eq!(event.event_type.as_ref(), Some(&"update".to_string()));
947 assert_eq!(event.id.as_ref(), Some(&"evt-999".to_string()));
948 assert_eq!(event.retry, Some(10000));
949
950 let _axum_event: axum::response::sse::Event = event.into_axum_event();
951 }
952
953 #[test]
954 fn test_sse_event_empty_data_field() {
955 let event = SseEvent::new(serde_json::json!({}));
956 assert!(event.data.is_object());
957 assert_eq!(event.data.as_object().unwrap().len(), 0);
958
959 let _axum_event: axum::response::sse::Event = event.into_axum_event();
960 }
961
962 #[test]
963 fn test_sse_event_data_with_newlines_in_string() {
964 let multiline_data = "first line\nsecond line\nthird line";
965 let event = SseEvent::new(serde_json::json!({"text": multiline_data}));
966
967 let stored_text = event.data.get("text").and_then(|v| v.as_str());
968 assert_eq!(stored_text, Some(multiline_data));
969
970 let _axum_event: axum::response::sse::Event = event.into_axum_event();
971 }
972
973 #[test]
974 fn test_sse_event_data_with_colons() {
975 let data_with_colons = "key1: value1, key2: value2";
976 let event = SseEvent::new(serde_json::json!({"data": data_with_colons}));
977
978 let stored_data = event.data.get("data").and_then(|v| v.as_str());
979 assert_eq!(stored_data, Some(data_with_colons));
980
981 let _axum_event: axum::response::sse::Event = event.into_axum_event();
982 }
983
984 #[test]
985 fn test_sse_event_comment_only_structure() {
986 let event = SseEvent::new(serde_json::json!({"comment": "this is a comment"}));
987 let _axum_event: axum::response::sse::Event = event.into_axum_event();
988 }
989
990 #[test]
991 fn test_sse_event_type_with_spaces() {
992 let event = SseEvent::with_type("event type with spaces", serde_json::json!({"data": "test"}));
993 assert_eq!(event.event_type, Some("event type with spaces".to_string()));
994
995 let _axum_event: axum::response::sse::Event = event.into_axum_event();
996 }
997
998 #[test]
999 fn test_sse_event_type_with_special_chars() {
1000 let event_types = vec!["update-v2", "event_123", "message.sent", "type-with-dash"];
1001
1002 for event_type in event_types {
1003 let event = SseEvent::with_type(event_type, serde_json::json!({"data": "test"}));
1004 assert_eq!(event.event_type.as_ref(), Some(&event_type.to_string()));
1005
1006 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1007 }
1008 }
1009
1010 #[test]
1011 fn test_sse_event_id_alphanumeric() {
1012 let ids = vec!["123", "abc-def", "event_001", "id-with-dashes-123"];
1013
1014 for id in ids {
1015 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_id(id);
1016 assert_eq!(event.id.as_ref(), Some(&id.to_string()));
1017
1018 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1019 }
1020 }
1021
1022 #[test]
1023 fn test_sse_event_retry_zero() {
1024 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(0);
1025 assert_eq!(event.retry, Some(0));
1026
1027 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1028 }
1029
1030 #[test]
1031 fn test_sse_event_retry_small_value() {
1032 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(100);
1033 assert_eq!(event.retry, Some(100));
1034
1035 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1036 }
1037
1038 #[test]
1039 fn test_sse_event_retry_large_value() {
1040 let large_retry = u64::MAX / 2;
1041 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(large_retry);
1042 assert_eq!(event.retry, Some(large_retry));
1043
1044 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1045 }
1046
1047 #[test]
1048 fn test_sse_event_retry_typical_values() {
1049 let typical_retries = vec![1000, 3000, 5000, 10000, 30000];
1050
1051 for retry_ms in typical_retries {
1052 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(retry_ms);
1053 assert_eq!(event.retry, Some(retry_ms));
1054
1055 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1056 }
1057 }
1058
1059 #[test]
1060 fn test_sse_event_utf8_emoji_in_data() {
1061 let emoji_data = "Hello 👋 World 🌍";
1062 let event = SseEvent::new(serde_json::json!({"text": emoji_data}));
1063
1064 let stored = event.data.get("text").and_then(|v| v.as_str());
1065 assert_eq!(stored, Some(emoji_data));
1066
1067 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1068 }
1069
1070 #[test]
1071 fn test_sse_event_utf8_chinese_characters() {
1072 let chinese_text = "你好世界";
1073 let event = SseEvent::new(serde_json::json!({"text": chinese_text}));
1074
1075 let stored = event.data.get("text").and_then(|v| v.as_str());
1076 assert_eq!(stored, Some(chinese_text));
1077
1078 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1079 }
1080
1081 #[test]
1082 fn test_sse_event_utf8_arabic_characters() {
1083 let arabic_text = "مرحبا بالعالم";
1084 let event = SseEvent::new(serde_json::json!({"text": arabic_text}));
1085
1086 let stored = event.data.get("text").and_then(|v| v.as_str());
1087 assert_eq!(stored, Some(arabic_text));
1088
1089 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1090 }
1091
1092 #[test]
1093 fn test_sse_event_utf8_mixed_scripts() {
1094 let mixed = "Hello 你好 مرحبا 👋";
1095 let event = SseEvent::new(serde_json::json!({"text": mixed}));
1096
1097 let stored = event.data.get("text").and_then(|v| v.as_str());
1098 assert_eq!(stored, Some(mixed));
1099
1100 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1101 }
1102
1103 #[test]
1104 fn test_sse_event_json_serialization_produces_valid_utf8() {
1105 let event = SseEvent::new(serde_json::json!({"text": "test"}));
1106 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1107 }
1108
1109 #[test]
1110 fn test_sse_event_64kb_payload() {
1111 let large_data = "x".repeat(65536);
1112 let event = SseEvent::new(serde_json::json!({"payload": large_data.clone()}));
1113
1114 let stored = event.data.get("payload").and_then(|v| v.as_str());
1115 assert_eq!(stored.map(|s| s.len()), Some(65536));
1116
1117 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1118 }
1119
1120 #[test]
1121 fn test_sse_event_1mb_payload() {
1122 let large_data = "y".repeat(1_000_000);
1123 let event = SseEvent::new(serde_json::json!({"payload": large_data.clone()}));
1124
1125 let stored = event.data.get("payload").and_then(|v| v.as_str());
1126 assert_eq!(stored.map(|s| s.len()), Some(1_000_000));
1127
1128 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1129 }
1130
1131 #[test]
1132 fn test_sse_event_deeply_nested_json() {
1133 let deeply_nested = serde_json::json!({
1134 "level1": {
1135 "level2": {
1136 "level3": {
1137 "level4": {
1138 "level5": {
1139 "level6": {
1140 "level7": {
1141 "value": "deep"
1142 }
1143 }
1144 }
1145 }
1146 }
1147 }
1148 }
1149 });
1150
1151 let event = SseEvent::new(deeply_nested);
1152 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1153 }
1154
1155 #[test]
1156 fn test_sse_event_array_in_data() {
1157 let event = SseEvent::new(serde_json::json!({
1158 "items": [1, 2, 3, 4, 5]
1159 }));
1160
1161 let items = event.data.get("items").and_then(|v| v.as_array());
1162 assert!(items.is_some());
1163 assert_eq!(items.unwrap().len(), 5);
1164
1165 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1166 }
1167
1168 #[test]
1169 fn test_sse_event_null_value_in_data() {
1170 let event = SseEvent::new(serde_json::json!({
1171 "nullable": null
1172 }));
1173
1174 assert!(event.data.get("nullable").unwrap().is_null());
1175
1176 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1177 }
1178
1179 #[test]
1180 fn test_sse_event_boolean_values() {
1181 let event = SseEvent::new(serde_json::json!({
1182 "active": true,
1183 "deleted": false
1184 }));
1185
1186 assert_eq!(event.data.get("active").and_then(|v| v.as_bool()), Some(true));
1187 assert_eq!(event.data.get("deleted").and_then(|v| v.as_bool()), Some(false));
1188
1189 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1190 }
1191
1192 #[tokio::test]
1193 async fn test_sse_last_event_id_header_simulation() {
1194 let producer = RapidEventProducer::new(5);
1195
1196 let mut events = Vec::new();
1197 for _ in 0..5 {
1198 if let Some(evt) = producer.next_event().await {
1199 events.push(evt);
1200 }
1201 }
1202
1203 assert_eq!(events.len(), 5);
1204 }
1205
1206 #[tokio::test]
1207 async fn test_sse_retry_timeout_specification() {
1208 let producer = FullFieldProducer {
1209 sent: AtomicBool::new(false),
1210 };
1211
1212 let event = producer.next_event().await;
1213 assert!(event.is_some());
1214
1215 if let Some(evt) = event {
1216 assert_eq!(evt.retry, Some(5000), "Retry should be 5000ms");
1217 }
1218 }
1219
1220 #[test]
1221 fn test_sse_event_builder_method_chaining() {
1222 let event = SseEvent::new(serde_json::json!({"data": "test"}))
1223 .with_id("id-1")
1224 .with_retry(3000);
1225
1226 assert_eq!(event.id, Some("id-1".to_string()));
1227 assert_eq!(event.retry, Some(3000));
1228
1229 let event2 = SseEvent::with_type("msg", serde_json::json!({"x": 1}))
1230 .with_id("id-2")
1231 .with_retry(5000);
1232
1233 assert_eq!(event2.event_type, Some("msg".to_string()));
1234 assert_eq!(event2.id, Some("id-2".to_string()));
1235 assert_eq!(event2.retry, Some(5000));
1236 }
1237
1238 #[test]
1239 fn test_sse_event_overwriting_fields() {
1240 let event = SseEvent::new(serde_json::json!({"v": 1}))
1241 .with_id("id-original")
1242 .with_retry(1000);
1243
1244 assert_eq!(event.id, Some("id-original".to_string()));
1245 assert_eq!(event.retry, Some(1000));
1246 }
1247
1248 #[test]
1249 fn test_sse_event_type_empty_string() {
1250 let event = SseEvent::with_type("", serde_json::json!({"data": "test"}));
1251 assert_eq!(event.event_type, Some("".to_string()));
1252
1253 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1254 }
1255
1256 #[test]
1257 fn test_sse_event_id_empty_string() {
1258 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_id("");
1259 assert_eq!(event.id, Some("".to_string()));
1260
1261 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1262 }
1263
1264 #[tokio::test]
1265 async fn test_sse_event_sequence_maintains_order() {
1266 let producer = RapidEventProducer::new(10);
1267
1268 let mut event_ids = Vec::new();
1269 for _ in 0..10 {
1270 if let Some(evt) = producer.next_event().await {
1271 if let Some(id) = evt.data.get("id").and_then(|v| v.as_i64()) {
1272 event_ids.push(id);
1273 }
1274 }
1275 }
1276
1277 for i in 0..event_ids.len() {
1278 assert_eq!(event_ids[i], i as i64, "Event order should match insertion order");
1279 }
1280 }
1281
1282 #[tokio::test]
1283 async fn test_sse_rapid_events_no_loss() {
1284 let producer = RapidEventProducer::new(50);
1285
1286 let mut count = 0;
1287 loop {
1288 match producer.next_event().await {
1289 Some(_) => count += 1,
1290 None => break,
1291 }
1292 }
1293
1294 assert_eq!(count, 50, "All events should be produced without loss");
1295 }
1296
1297 #[tokio::test]
1298 async fn test_sse_event_batching_simulation() {
1299 let producer = RapidEventProducer::new(20);
1300
1301 let mut batch_size = 0;
1302 let mut batch_count = 0;
1303
1304 loop {
1305 match producer.next_event().await {
1306 Some(_evt) => {
1307 batch_size += 1;
1308 if batch_size >= 5 {
1309 batch_count += 1;
1310 batch_size = 0;
1311 }
1312 }
1313 None => {
1314 if batch_size > 0 {
1315 batch_count += 1;
1316 }
1317 break;
1318 }
1319 }
1320 }
1321
1322 assert!(batch_count >= 4, "Should have processed at least 4 batches");
1323 }
1324
1325 #[test]
1326 fn test_sse_state_arc_sharing() {
1327 let producer = TestProducer {
1328 count: AtomicUsize::new(0),
1329 };
1330 let state1 = SseState::new(producer);
1331 let state2 = state1.clone();
1332 let state3 = state2.clone();
1333
1334 assert!(Arc::ptr_eq(&state1.producer, &state2.producer));
1335 assert!(Arc::ptr_eq(&state2.producer, &state3.producer));
1336 }
1337
1338 #[test]
1339 fn test_sse_state_schema_arc_sharing() {
1340 let producer = TestProducer {
1341 count: AtomicUsize::new(0),
1342 };
1343 let schema = serde_json::json!({
1344 "type": "object"
1345 });
1346
1347 let state1 = SseState::with_schema(producer, Some(schema)).expect("schema should be valid");
1348 let state2 = state1.clone();
1349
1350 match (&state1.event_schema, &state2.event_schema) {
1351 (Some(s1), Some(s2)) => {
1352 assert!(Arc::ptr_eq(s1, s2));
1353 }
1354 _ => panic!("Both states should have schema"),
1355 }
1356 }
1357
1358 #[test]
1359 fn test_sse_event_into_axum_event_numeric_data() {
1360 let event = SseEvent::new(serde_json::json!({
1361 "count": 42,
1362 "temperature": 98.6,
1363 "negative": -273
1364 }));
1365
1366 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1367 }
1368
1369 #[test]
1370 fn test_sse_event_json_number_precision() {
1371 let event = SseEvent::new(serde_json::json!({
1372 "float": 3.14159265359,
1373 "large_int": 9007199254740991i64
1374 }));
1375
1376 assert_eq!(event.data.get("float").and_then(|v| v.as_f64()), Some(3.14159265359));
1377
1378 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1379 }
1380
1381 #[test]
1382 fn test_sse_event_string_escaping() {
1383 let event = SseEvent::new(serde_json::json!({
1384 "escaped": "line1\nline2\ttab",
1385 "quotes": "He said \"hello\"",
1386 "backslash": "path\\to\\file"
1387 }));
1388
1389 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1390 }
1391
1392 #[test]
1393 fn test_sse_event_all_json_types_combined() {
1394 let event = SseEvent::new(serde_json::json!({
1395 "string": "text",
1396 "number": 123,
1397 "float": 1.5,
1398 "boolean": true,
1399 "null_value": null,
1400 "array": [1, 2, 3],
1401 "object": {
1402 "nested": "value"
1403 }
1404 }));
1405
1406 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1407 }
1408}