1use axum::{
6 extract::State,
7 response::{
8 IntoResponse,
9 sse::{Event, KeepAlive, Sse},
10 },
11};
12use futures_util::stream;
13use serde_json::Value;
14use std::{convert::Infallible, sync::Arc, time::Duration};
15use tracing::{debug, error, info};
16
17pub trait SseEventProducer: Send + Sync {
68 fn next_event(&self) -> impl std::future::Future<Output = Option<SseEvent>> + Send;
77
78 fn on_connect(&self) -> impl std::future::Future<Output = ()> + Send {
83 async {}
84 }
85
86 fn on_disconnect(&self) -> impl std::future::Future<Output = ()> + Send {
91 async {}
92 }
93}
94
95#[derive(Debug, Clone)]
117pub struct SseEvent {
118 pub event_type: Option<String>,
120 pub data: Value,
122 pub id: Option<String>,
124 pub retry: Option<u64>,
126}
127
128impl SseEvent {
129 pub fn new(data: Value) -> Self {
146 Self {
147 event_type: None,
148 data,
149 id: None,
150 retry: None,
151 }
152 }
153
154 pub fn with_type(event_type: impl Into<String>, data: Value) -> Self {
173 Self {
174 event_type: Some(event_type.into()),
175 data,
176 id: None,
177 retry: None,
178 }
179 }
180
181 pub fn with_id(mut self, id: impl Into<String>) -> Self {
199 self.id = Some(id.into());
200 self
201 }
202
203 pub fn with_retry(mut self, retry_ms: u64) -> Self {
221 self.retry = Some(retry_ms);
222 self
223 }
224
225 fn into_axum_event(self) -> Event {
227 let json_data = match serde_json::to_string(&self.data) {
228 Ok(json) => json,
229 Err(e) => {
230 error!("Failed to serialize SSE event data: {}", e);
231 "null".to_string()
232 }
233 };
234
235 let mut event = Event::default().data(json_data);
236
237 if let Some(event_type) = self.event_type {
238 event = event.event(event_type);
239 }
240
241 if let Some(id) = self.id {
242 event = event.id(id);
243 }
244
245 if let Some(retry) = self.retry {
246 event = event.retry(Duration::from_millis(retry));
247 }
248
249 event
250 }
251}
252
253pub struct SseState<P: SseEventProducer> {
258 producer: Arc<P>,
260 event_schema: Option<Arc<jsonschema::Validator>>,
262}
263
264impl<P: SseEventProducer> Clone for SseState<P> {
265 fn clone(&self) -> Self {
266 Self {
267 producer: Arc::clone(&self.producer),
268 event_schema: self.event_schema.clone(),
269 }
270 }
271}
272
273impl<P: SseEventProducer + 'static> SseState<P> {
274 pub fn new(producer: P) -> Self {
288 Self {
289 producer: Arc::new(producer),
290 event_schema: None,
291 }
292 }
293
294 pub fn with_schema(producer: P, event_schema: Option<serde_json::Value>) -> Result<Self, String> {
322 let event_validator = if let Some(schema) = event_schema {
323 Some(Arc::new(
324 jsonschema::validator_for(&schema).map_err(|e| format!("Invalid event schema: {}", e))?,
325 ))
326 } else {
327 None
328 };
329
330 Ok(Self {
331 producer: Arc::new(producer),
332 event_schema: event_validator,
333 })
334 }
335}
336
337pub async fn sse_handler<P: SseEventProducer + 'static>(State(state): State<SseState<P>>) -> impl IntoResponse {
366 info!("SSE client connected");
367
368 state.producer.on_connect().await;
369
370 let producer = Arc::clone(&state.producer);
371 let event_schema = state.event_schema.clone();
372 let stream = stream::unfold((producer, event_schema), |(producer, event_schema)| async move {
373 match producer.next_event().await {
374 Some(sse_event) => {
375 debug!("Sending SSE event: {:?}", sse_event.event_type);
376
377 if let Some(validator) = &event_schema
378 && !validator.is_valid(&sse_event.data)
379 {
380 error!("SSE event validation failed");
381 return Some((
382 Ok::<_, Infallible>(Event::default().data("validation_error")),
383 (producer, event_schema),
384 ));
385 }
386
387 let event = sse_event.into_axum_event();
388 Some((Ok::<_, Infallible>(event), (producer, event_schema)))
389 }
390 None => {
391 info!("SSE stream ended");
392 producer.on_disconnect().await;
393 None
394 }
395 }
396 });
397
398 let sse_response =
399 Sse::new(stream).keep_alive(KeepAlive::new().interval(Duration::from_secs(15)).text("keep-alive"));
400
401 sse_response.into_response()
402}
403
404#[cfg(test)]
405mod tests {
406 use super::*;
407 use std::sync::Arc;
408 use std::sync::atomic::{AtomicBool, AtomicUsize, Ordering};
409
410 struct TestProducer {
411 count: AtomicUsize,
412 }
413
414 impl SseEventProducer for TestProducer {
415 async fn next_event(&self) -> Option<SseEvent> {
416 let count = self.count.fetch_add(1, Ordering::Relaxed);
417 if count < 3 {
418 Some(SseEvent::new(serde_json::json!({
419 "message": format!("Event {}", count)
420 })))
421 } else {
422 None
423 }
424 }
425 }
426
427 struct LifecycleProducer {
429 connect_count: Arc<AtomicUsize>,
430 disconnect_count: Arc<AtomicUsize>,
431 event_count: AtomicUsize,
432 }
433
434 impl LifecycleProducer {
435 fn new(connect: Arc<AtomicUsize>, disconnect: Arc<AtomicUsize>) -> Self {
436 Self {
437 connect_count: connect,
438 disconnect_count: disconnect,
439 event_count: AtomicUsize::new(0),
440 }
441 }
442 }
443
444 impl SseEventProducer for LifecycleProducer {
445 async fn next_event(&self) -> Option<SseEvent> {
446 let idx: usize = self.event_count.fetch_add(1, Ordering::Relaxed);
447 if idx < 2 {
448 Some(SseEvent::new(serde_json::json!({"event": idx})))
449 } else {
450 None
451 }
452 }
453
454 async fn on_connect(&self) {
455 self.connect_count.fetch_add(1, Ordering::Relaxed);
456 }
457
458 async fn on_disconnect(&self) {
459 self.disconnect_count.fetch_add(1, Ordering::Relaxed);
460 }
461 }
462
463 struct MultilineProducer {
465 sent: AtomicBool,
466 }
467
468 impl SseEventProducer for MultilineProducer {
469 async fn next_event(&self) -> Option<SseEvent> {
470 let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
471 if !was_sent {
472 Some(SseEvent::new(serde_json::json!({
473 "text": "line1\nline2\nline3"
474 })))
475 } else {
476 None
477 }
478 }
479 }
480
481 struct SpecialCharsProducer {
483 sent: AtomicBool,
484 }
485
486 impl SseEventProducer for SpecialCharsProducer {
487 async fn next_event(&self) -> Option<SseEvent> {
488 let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
489 if !was_sent {
490 Some(SseEvent::new(serde_json::json!({
491 "data": "special: \"quotes\", \\ backslash, \t tab, \r\n crlf"
492 })))
493 } else {
494 None
495 }
496 }
497 }
498
499 struct LargePayloadProducer {
501 sent: AtomicBool,
502 }
503
504 impl SseEventProducer for LargePayloadProducer {
505 async fn next_event(&self) -> Option<SseEvent> {
506 let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
507 if !was_sent {
508 let large_string: String = "x".repeat(100_000);
509 Some(SseEvent::new(serde_json::json!({
510 "payload": large_string
511 })))
512 } else {
513 None
514 }
515 }
516 }
517
518 struct RapidEventProducer {
520 event_count: usize,
521 current: AtomicUsize,
522 }
523
524 impl RapidEventProducer {
525 fn new(count: usize) -> Self {
526 Self {
527 event_count: count,
528 current: AtomicUsize::new(0),
529 }
530 }
531 }
532
533 impl SseEventProducer for RapidEventProducer {
534 async fn next_event(&self) -> Option<SseEvent> {
535 let idx: usize = self.current.fetch_add(1, Ordering::Relaxed);
536 if idx < self.event_count {
537 Some(SseEvent::new(serde_json::json!({
538 "id": idx,
539 "data": format!("event_{}", idx)
540 })))
541 } else {
542 None
543 }
544 }
545 }
546
547 struct FullFieldProducer {
549 sent: AtomicBool,
550 }
551
552 impl SseEventProducer for FullFieldProducer {
553 async fn next_event(&self) -> Option<SseEvent> {
554 let was_sent: bool = self.sent.swap(true, Ordering::Relaxed);
555 if !was_sent {
556 Some(
557 SseEvent::with_type(
558 "counter_update",
559 serde_json::json!({
560 "count": 42,
561 "status": "active"
562 }),
563 )
564 .with_id("event-123")
565 .with_retry(5000),
566 )
567 } else {
568 None
569 }
570 }
571 }
572
573 struct NoEventProducer;
575
576 impl SseEventProducer for NoEventProducer {
577 async fn next_event(&self) -> Option<SseEvent> {
578 None
579 }
580 }
581
582 #[test]
583 fn test_sse_event_creation_minimal() {
584 let event: SseEvent = SseEvent::new(serde_json::json!({"test": "data"}));
585 assert!(event.event_type.is_none());
586 assert!(event.id.is_none());
587 assert!(event.retry.is_none());
588 }
589
590 #[test]
591 fn test_sse_event_with_all_fields() {
592 let event: SseEvent = SseEvent::with_type("update", serde_json::json!({"count": 42}))
593 .with_id("event-001")
594 .with_retry(3000);
595
596 assert_eq!(event.event_type, Some("update".to_string()));
597 assert_eq!(event.id, Some("event-001".to_string()));
598 assert_eq!(event.retry, Some(3000));
599 }
600
601 #[test]
602 fn test_sse_event_builder_pattern() {
603 let event: SseEvent = SseEvent::with_type("notification", serde_json::json!({"text": "hello"}))
604 .with_id("notif-456")
605 .with_retry(5000);
606
607 assert_eq!(event.event_type, Some("notification".to_string()));
608 assert_eq!(event.id, Some("notif-456".to_string()));
609 assert_eq!(event.retry, Some(5000));
610 }
611
612 #[test]
613 fn test_sse_event_multiline_data() {
614 let event: SseEvent = SseEvent::new(serde_json::json!({
615 "text": "line1\nline2\nline3"
616 }));
617
618 assert!(event.data.is_object());
619 let text: Option<&str> = event.data.get("text").and_then(|v| v.as_str());
620 assert_eq!(text, Some("line1\nline2\nline3"));
621 }
622
623 #[test]
624 fn test_sse_event_special_characters() {
625 let event: SseEvent = SseEvent::new(serde_json::json!({
626 "data": "special: \"quotes\", \\ backslash"
627 }));
628
629 assert!(event.data.is_object());
630 }
631
632 #[test]
633 fn test_sse_event_large_payload() {
634 let large_string: String = "x".repeat(100_000);
635 let event: SseEvent = SseEvent::new(serde_json::json!({
636 "payload": large_string.clone()
637 }));
638
639 let payload_field: Option<&str> = event.data.get("payload").and_then(|v| v.as_str());
640 assert_eq!(payload_field.map(|s| s.len()), Some(100_000));
641 }
642
643 #[test]
644 fn test_sse_event_into_axum_event_conversion() {
645 let event: SseEvent = SseEvent::new(serde_json::json!({"msg": "test"}));
646 let _axum_event: axum::response::sse::Event = event.into_axum_event();
647 }
648
649 #[test]
650 fn test_sse_event_into_axum_with_all_fields() {
651 let event: SseEvent = SseEvent::with_type("event", serde_json::json!({"id": 1}))
652 .with_id("123")
653 .with_retry(5000);
654
655 let _axum_event: axum::response::sse::Event = event.into_axum_event();
656 }
657
658 #[test]
659 fn test_sse_state_creation() {
660 let producer: TestProducer = TestProducer {
661 count: AtomicUsize::new(0),
662 };
663 let state: SseState<TestProducer> = SseState::new(producer);
664 let cloned: SseState<TestProducer> = state.clone();
665 assert!(Arc::ptr_eq(&state.producer, &cloned.producer));
666 }
667
668 #[test]
669 fn test_sse_state_with_schema_valid() {
670 let producer: TestProducer = TestProducer {
671 count: AtomicUsize::new(0),
672 };
673 let schema: serde_json::Value = serde_json::json!({
674 "type": "object",
675 "properties": {
676 "message": {"type": "string"}
677 }
678 });
679
680 let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, Some(schema));
681 assert!(result.is_ok());
682 }
683
684 #[test]
685 fn test_sse_state_with_invalid_schema() {
686 let producer: TestProducer = TestProducer {
687 count: AtomicUsize::new(0),
688 };
689 let invalid_schema: serde_json::Value = serde_json::json!({
690 "type": "not-a-valid-type"
691 });
692
693 let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, Some(invalid_schema));
694 assert!(result.is_err());
695 }
696
697 #[test]
698 fn test_sse_state_with_schema_none() {
699 let producer: TestProducer = TestProducer {
700 count: AtomicUsize::new(0),
701 };
702 let result: Result<SseState<TestProducer>, String> = SseState::with_schema(producer, None);
703 assert!(result.is_ok());
704 }
705
706 #[tokio::test]
707 async fn test_sse_lifecycle_on_connect_called() {
708 let connect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
709 let disconnect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
710
711 let producer: LifecycleProducer =
712 LifecycleProducer::new(Arc::clone(&connect_count), Arc::clone(&disconnect_count));
713
714 producer.on_connect().await;
715 assert_eq!(connect_count.load(Ordering::Relaxed), 1);
716 }
717
718 #[tokio::test]
719 async fn test_sse_lifecycle_on_disconnect_called() {
720 let connect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
721 let disconnect_count: Arc<AtomicUsize> = Arc::new(AtomicUsize::new(0));
722
723 let producer: LifecycleProducer =
724 LifecycleProducer::new(Arc::clone(&connect_count), Arc::clone(&disconnect_count));
725
726 producer.on_disconnect().await;
727 assert_eq!(disconnect_count.load(Ordering::Relaxed), 1);
728 }
729
730 #[tokio::test]
731 async fn test_sse_event_ordering_preserved() {
732 let producer: RapidEventProducer = RapidEventProducer::new(10);
733
734 let mut last_idx: i32 = -1;
735 for _ in 0..10 {
736 if let Some(event) = producer.next_event().await {
737 if let Some(id) = event.data.get("id").and_then(|v| v.as_i64()) {
738 assert!(id as i32 > last_idx, "Event ordering violated");
739 last_idx = id as i32;
740 }
741 }
742 }
743 }
744
745 #[tokio::test]
746 async fn test_sse_rapid_event_sending() {
747 let producer: RapidEventProducer = RapidEventProducer::new(100);
748
749 let mut count: usize = 0;
750 loop {
751 match producer.next_event().await {
752 Some(_event) => count += 1,
753 None => break,
754 }
755 }
756
757 assert_eq!(count, 100);
758 }
759
760 #[test]
761 fn test_sse_event_with_empty_data_object() {
762 let event: SseEvent = SseEvent::new(serde_json::json!({}));
763 assert!(event.data.is_object());
764 }
765
766 #[test]
767 fn test_sse_event_with_nested_data() {
768 let event: SseEvent = SseEvent::new(serde_json::json!({
769 "nested": {
770 "deep": {
771 "value": "found"
772 }
773 }
774 }));
775
776 let deep_value: Option<&str> = event
777 .data
778 .get("nested")
779 .and_then(|v| v.get("deep"))
780 .and_then(|v| v.get("value"))
781 .and_then(|v| v.as_str());
782
783 assert_eq!(deep_value, Some("found"));
784 }
785
786 #[tokio::test]
787 async fn test_sse_producer_stream_ends_cleanly() {
788 let producer: NoEventProducer = NoEventProducer;
789
790 let event1: Option<SseEvent> = producer.next_event().await;
791 assert!(event1.is_none());
792
793 let event2: Option<SseEvent> = producer.next_event().await;
794 assert!(event2.is_none());
795 }
796
797 #[test]
798 fn test_sse_event_clone() {
799 let original: SseEvent = SseEvent::with_type("test", serde_json::json!({"data": "test"}))
800 .with_id("id-1")
801 .with_retry(2000);
802
803 let cloned: SseEvent = original.clone();
804
805 assert_eq!(cloned.event_type, original.event_type);
806 assert_eq!(cloned.id, original.id);
807 assert_eq!(cloned.retry, original.retry);
808 assert_eq!(cloned.data, original.data);
809 }
810
811 #[test]
812 fn test_sse_event_debug_impl() {
813 let event: SseEvent = SseEvent::new(serde_json::json!({"msg": "debug"}));
814 let debug_str: String = format!("{:?}", event);
815 assert!(debug_str.contains("SseEvent"));
816 }
817
818 #[tokio::test]
819 async fn test_sse_multiple_producers_independent() {
820 let producer1: TestProducer = TestProducer {
821 count: AtomicUsize::new(0),
822 };
823 let producer2: TestProducer = TestProducer {
824 count: AtomicUsize::new(0),
825 };
826
827 let _event1: Option<SseEvent> = producer1.next_event().await;
828 let _event2: Option<SseEvent> = producer2.next_event().await;
829
830 let count1: usize = producer1.count.load(Ordering::Relaxed);
831 let count2: usize = producer2.count.load(Ordering::Relaxed);
832
833 assert_eq!(count1, 1);
834 assert_eq!(count2, 1);
835 }
836
837 #[test]
838 fn test_sse_state_cloning_preserves_schema() {
839 let producer: TestProducer = TestProducer {
840 count: AtomicUsize::new(0),
841 };
842 let schema: serde_json::Value = serde_json::json!({
843 "type": "object",
844 "properties": {
845 "message": {"type": "string"}
846 }
847 });
848
849 let state: SseState<TestProducer> =
850 SseState::with_schema(producer, Some(schema)).expect("schema should be valid");
851 let cloned: SseState<TestProducer> = state.clone();
852
853 assert!(Arc::ptr_eq(&state.producer, &cloned.producer));
854 match (&state.event_schema, &cloned.event_schema) {
855 (Some(s1), Some(s2)) => {
856 assert!(Arc::ptr_eq(s1, s2));
857 }
858 _ => panic!("Schema should be preserved in clone"),
859 }
860 }
861
862 #[tokio::test]
863 async fn test_sse_large_payload_integrity() {
864 let producer: LargePayloadProducer = LargePayloadProducer {
865 sent: AtomicBool::new(false),
866 };
867
868 let event: Option<SseEvent> = producer.next_event().await;
869 assert!(event.is_some());
870
871 if let Some(evt) = event {
872 let payload: Option<&str> = evt.data.get("payload").and_then(|v| v.as_str());
873 assert_eq!(payload.map(|s| s.len()), Some(100_000));
874 }
875 }
876
877 #[tokio::test]
878 async fn test_sse_multiline_data_preservation() {
879 let producer: MultilineProducer = MultilineProducer {
880 sent: AtomicBool::new(false),
881 };
882
883 let event: Option<SseEvent> = producer.next_event().await;
884 assert!(event.is_some());
885
886 if let Some(evt) = event {
887 let text: Option<&str> = evt.data.get("text").and_then(|v| v.as_str());
888 assert_eq!(text, Some("line1\nline2\nline3"));
889 }
890 }
891
892 #[tokio::test]
893 async fn test_sse_special_chars_in_payload() {
894 let producer: SpecialCharsProducer = SpecialCharsProducer {
895 sent: AtomicBool::new(false),
896 };
897
898 let event: Option<SseEvent> = producer.next_event().await;
899 assert!(event.is_some());
900
901 if let Some(evt) = event {
902 let data: Option<&str> = evt.data.get("data").and_then(|v| v.as_str());
903 assert!(data.is_some());
904 assert!(data.unwrap().contains("quotes"));
905 }
906 }
907
908 #[tokio::test]
909 async fn test_sse_full_event_fields_together() {
910 let producer: FullFieldProducer = FullFieldProducer {
911 sent: AtomicBool::new(false),
912 };
913
914 let event: Option<SseEvent> = producer.next_event().await;
915 assert!(event.is_some());
916
917 if let Some(evt) = event {
918 assert_eq!(evt.event_type, Some("counter_update".to_string()));
919 assert_eq!(evt.id, Some("event-123".to_string()));
920 assert_eq!(evt.retry, Some(5000));
921 assert_eq!(evt.data.get("count").and_then(|v| v.as_i64()), Some(42));
922 }
923 }
924
925 #[test]
926 fn test_sse_event_to_axum_preserves_data() {
927 let event = SseEvent::new(serde_json::json!({"key": "value"}));
928 let _axum_event: axum::response::sse::Event = event.into_axum_event();
929 }
930
931 #[test]
932 fn test_sse_event_data_only_no_metadata() {
933 let event = SseEvent::new(serde_json::json!({"message": "hello"}));
934 assert!(event.event_type.is_none(), "event_type should be None");
935 assert!(event.id.is_none(), "id should be None");
936 assert!(event.retry.is_none(), "retry should be None");
937
938 let _axum_event: axum::response::sse::Event = event.into_axum_event();
939 }
940
941 #[test]
942 fn test_sse_event_with_all_fields_filled() {
943 let event = SseEvent::with_type("update", serde_json::json!({"status": "ok"}))
944 .with_id("evt-999")
945 .with_retry(10000);
946
947 assert_eq!(event.event_type.as_ref(), Some(&"update".to_string()));
948 assert_eq!(event.id.as_ref(), Some(&"evt-999".to_string()));
949 assert_eq!(event.retry, Some(10000));
950
951 let _axum_event: axum::response::sse::Event = event.into_axum_event();
952 }
953
954 #[test]
955 fn test_sse_event_empty_data_field() {
956 let event = SseEvent::new(serde_json::json!({}));
957 assert!(event.data.is_object());
958 assert_eq!(event.data.as_object().unwrap().len(), 0);
959
960 let _axum_event: axum::response::sse::Event = event.into_axum_event();
961 }
962
963 #[test]
964 fn test_sse_event_data_with_newlines_in_string() {
965 let multiline_data = "first line\nsecond line\nthird line";
966 let event = SseEvent::new(serde_json::json!({"text": multiline_data}));
967
968 let stored_text = event.data.get("text").and_then(|v| v.as_str());
969 assert_eq!(stored_text, Some(multiline_data));
970
971 let _axum_event: axum::response::sse::Event = event.into_axum_event();
972 }
973
974 #[test]
975 fn test_sse_event_data_with_colons() {
976 let data_with_colons = "key1: value1, key2: value2";
977 let event = SseEvent::new(serde_json::json!({"data": data_with_colons}));
978
979 let stored_data = event.data.get("data").and_then(|v| v.as_str());
980 assert_eq!(stored_data, Some(data_with_colons));
981
982 let _axum_event: axum::response::sse::Event = event.into_axum_event();
983 }
984
985 #[test]
986 fn test_sse_event_comment_only_structure() {
987 let event = SseEvent::new(serde_json::json!({"comment": "this is a comment"}));
988 let _axum_event: axum::response::sse::Event = event.into_axum_event();
989 }
990
991 #[test]
992 fn test_sse_event_type_with_spaces() {
993 let event = SseEvent::with_type("event type with spaces", serde_json::json!({"data": "test"}));
994 assert_eq!(event.event_type, Some("event type with spaces".to_string()));
995
996 let _axum_event: axum::response::sse::Event = event.into_axum_event();
997 }
998
999 #[test]
1000 fn test_sse_event_type_with_special_chars() {
1001 let event_types = vec!["update-v2", "event_123", "message.sent", "type-with-dash"];
1002
1003 for event_type in event_types {
1004 let event = SseEvent::with_type(event_type, serde_json::json!({"data": "test"}));
1005 assert_eq!(event.event_type.as_ref(), Some(&event_type.to_string()));
1006
1007 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1008 }
1009 }
1010
1011 #[test]
1012 fn test_sse_event_id_alphanumeric() {
1013 let ids = vec!["123", "abc-def", "event_001", "id-with-dashes-123"];
1014
1015 for id in ids {
1016 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_id(id);
1017 assert_eq!(event.id.as_ref(), Some(&id.to_string()));
1018
1019 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1020 }
1021 }
1022
1023 #[test]
1024 fn test_sse_event_retry_zero() {
1025 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(0);
1026 assert_eq!(event.retry, Some(0));
1027
1028 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1029 }
1030
1031 #[test]
1032 fn test_sse_event_retry_small_value() {
1033 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(100);
1034 assert_eq!(event.retry, Some(100));
1035
1036 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1037 }
1038
1039 #[test]
1040 fn test_sse_event_retry_large_value() {
1041 let large_retry = u64::MAX / 2;
1042 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(large_retry);
1043 assert_eq!(event.retry, Some(large_retry));
1044
1045 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1046 }
1047
1048 #[test]
1049 fn test_sse_event_retry_typical_values() {
1050 let typical_retries = vec![1000, 3000, 5000, 10000, 30000];
1051
1052 for retry_ms in typical_retries {
1053 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_retry(retry_ms);
1054 assert_eq!(event.retry, Some(retry_ms));
1055
1056 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1057 }
1058 }
1059
1060 #[test]
1061 fn test_sse_event_utf8_emoji_in_data() {
1062 let emoji_data = "Hello 👋 World 🌍";
1063 let event = SseEvent::new(serde_json::json!({"text": emoji_data}));
1064
1065 let stored = event.data.get("text").and_then(|v| v.as_str());
1066 assert_eq!(stored, Some(emoji_data));
1067
1068 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1069 }
1070
1071 #[test]
1072 fn test_sse_event_utf8_chinese_characters() {
1073 let chinese_text = "你好世界";
1074 let event = SseEvent::new(serde_json::json!({"text": chinese_text}));
1075
1076 let stored = event.data.get("text").and_then(|v| v.as_str());
1077 assert_eq!(stored, Some(chinese_text));
1078
1079 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1080 }
1081
1082 #[test]
1083 fn test_sse_event_utf8_arabic_characters() {
1084 let arabic_text = "مرحبا بالعالم";
1085 let event = SseEvent::new(serde_json::json!({"text": arabic_text}));
1086
1087 let stored = event.data.get("text").and_then(|v| v.as_str());
1088 assert_eq!(stored, Some(arabic_text));
1089
1090 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1091 }
1092
1093 #[test]
1094 fn test_sse_event_utf8_mixed_scripts() {
1095 let mixed = "Hello 你好 مرحبا 👋";
1096 let event = SseEvent::new(serde_json::json!({"text": mixed}));
1097
1098 let stored = event.data.get("text").and_then(|v| v.as_str());
1099 assert_eq!(stored, Some(mixed));
1100
1101 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1102 }
1103
1104 #[test]
1105 fn test_sse_event_json_serialization_produces_valid_utf8() {
1106 let event = SseEvent::new(serde_json::json!({"text": "test"}));
1107 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1108 }
1109
1110 #[test]
1111 fn test_sse_event_64kb_payload() {
1112 let large_data = "x".repeat(65536);
1113 let event = SseEvent::new(serde_json::json!({"payload": large_data.clone()}));
1114
1115 let stored = event.data.get("payload").and_then(|v| v.as_str());
1116 assert_eq!(stored.map(|s| s.len()), Some(65536));
1117
1118 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1119 }
1120
1121 #[test]
1122 fn test_sse_event_1mb_payload() {
1123 let large_data = "y".repeat(1_000_000);
1124 let event = SseEvent::new(serde_json::json!({"payload": large_data.clone()}));
1125
1126 let stored = event.data.get("payload").and_then(|v| v.as_str());
1127 assert_eq!(stored.map(|s| s.len()), Some(1_000_000));
1128
1129 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1130 }
1131
1132 #[test]
1133 fn test_sse_event_deeply_nested_json() {
1134 let deeply_nested = serde_json::json!({
1135 "level1": {
1136 "level2": {
1137 "level3": {
1138 "level4": {
1139 "level5": {
1140 "level6": {
1141 "level7": {
1142 "value": "deep"
1143 }
1144 }
1145 }
1146 }
1147 }
1148 }
1149 }
1150 });
1151
1152 let event = SseEvent::new(deeply_nested);
1153 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1154 }
1155
1156 #[test]
1157 fn test_sse_event_array_in_data() {
1158 let event = SseEvent::new(serde_json::json!({
1159 "items": [1, 2, 3, 4, 5]
1160 }));
1161
1162 let items = event.data.get("items").and_then(|v| v.as_array());
1163 assert!(items.is_some());
1164 assert_eq!(items.unwrap().len(), 5);
1165
1166 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1167 }
1168
1169 #[test]
1170 fn test_sse_event_null_value_in_data() {
1171 let event = SseEvent::new(serde_json::json!({
1172 "nullable": null
1173 }));
1174
1175 assert!(event.data.get("nullable").unwrap().is_null());
1176
1177 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1178 }
1179
1180 #[test]
1181 fn test_sse_event_boolean_values() {
1182 let event = SseEvent::new(serde_json::json!({
1183 "active": true,
1184 "deleted": false
1185 }));
1186
1187 assert_eq!(event.data.get("active").and_then(|v| v.as_bool()), Some(true));
1188 assert_eq!(event.data.get("deleted").and_then(|v| v.as_bool()), Some(false));
1189
1190 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1191 }
1192
1193 #[tokio::test]
1194 async fn test_sse_last_event_id_header_simulation() {
1195 let producer = RapidEventProducer::new(5);
1196
1197 let mut events = Vec::new();
1198 for _ in 0..5 {
1199 if let Some(evt) = producer.next_event().await {
1200 events.push(evt);
1201 }
1202 }
1203
1204 assert_eq!(events.len(), 5);
1205 }
1206
1207 #[tokio::test]
1208 async fn test_sse_retry_timeout_specification() {
1209 let producer = FullFieldProducer {
1210 sent: AtomicBool::new(false),
1211 };
1212
1213 let event = producer.next_event().await;
1214 assert!(event.is_some());
1215
1216 if let Some(evt) = event {
1217 assert_eq!(evt.retry, Some(5000), "Retry should be 5000ms");
1218 }
1219 }
1220
1221 #[test]
1222 fn test_sse_event_builder_method_chaining() {
1223 let event = SseEvent::new(serde_json::json!({"data": "test"}))
1224 .with_id("id-1")
1225 .with_retry(3000);
1226
1227 assert_eq!(event.id, Some("id-1".to_string()));
1228 assert_eq!(event.retry, Some(3000));
1229
1230 let event2 = SseEvent::with_type("msg", serde_json::json!({"x": 1}))
1231 .with_id("id-2")
1232 .with_retry(5000);
1233
1234 assert_eq!(event2.event_type, Some("msg".to_string()));
1235 assert_eq!(event2.id, Some("id-2".to_string()));
1236 assert_eq!(event2.retry, Some(5000));
1237 }
1238
1239 #[test]
1240 fn test_sse_event_overwriting_fields() {
1241 let event = SseEvent::new(serde_json::json!({"v": 1}))
1242 .with_id("id-original")
1243 .with_retry(1000);
1244
1245 assert_eq!(event.id, Some("id-original".to_string()));
1246 assert_eq!(event.retry, Some(1000));
1247 }
1248
1249 #[test]
1250 fn test_sse_event_type_empty_string() {
1251 let event = SseEvent::with_type("", serde_json::json!({"data": "test"}));
1252 assert_eq!(event.event_type, Some("".to_string()));
1253
1254 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1255 }
1256
1257 #[test]
1258 fn test_sse_event_id_empty_string() {
1259 let event = SseEvent::new(serde_json::json!({"data": "test"})).with_id("");
1260 assert_eq!(event.id, Some("".to_string()));
1261
1262 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1263 }
1264
1265 #[tokio::test]
1266 async fn test_sse_event_sequence_maintains_order() {
1267 let producer = RapidEventProducer::new(10);
1268
1269 let mut event_ids = Vec::new();
1270 for _ in 0..10 {
1271 if let Some(evt) = producer.next_event().await {
1272 if let Some(id) = evt.data.get("id").and_then(|v| v.as_i64()) {
1273 event_ids.push(id);
1274 }
1275 }
1276 }
1277
1278 for i in 0..event_ids.len() {
1279 assert_eq!(event_ids[i], i as i64, "Event order should match insertion order");
1280 }
1281 }
1282
1283 #[tokio::test]
1284 async fn test_sse_rapid_events_no_loss() {
1285 let producer = RapidEventProducer::new(50);
1286
1287 let mut count = 0;
1288 loop {
1289 match producer.next_event().await {
1290 Some(_) => count += 1,
1291 None => break,
1292 }
1293 }
1294
1295 assert_eq!(count, 50, "All events should be produced without loss");
1296 }
1297
1298 #[tokio::test]
1299 async fn test_sse_event_batching_simulation() {
1300 let producer = RapidEventProducer::new(20);
1301
1302 let mut batch_size = 0;
1303 let mut batch_count = 0;
1304
1305 loop {
1306 match producer.next_event().await {
1307 Some(_evt) => {
1308 batch_size += 1;
1309 if batch_size >= 5 {
1310 batch_count += 1;
1311 batch_size = 0;
1312 }
1313 }
1314 None => {
1315 if batch_size > 0 {
1316 batch_count += 1;
1317 }
1318 break;
1319 }
1320 }
1321 }
1322
1323 assert!(batch_count >= 4, "Should have processed at least 4 batches");
1324 }
1325
1326 #[test]
1327 fn test_sse_state_arc_sharing() {
1328 let producer = TestProducer {
1329 count: AtomicUsize::new(0),
1330 };
1331 let state1 = SseState::new(producer);
1332 let state2 = state1.clone();
1333 let state3 = state2.clone();
1334
1335 assert!(Arc::ptr_eq(&state1.producer, &state2.producer));
1336 assert!(Arc::ptr_eq(&state2.producer, &state3.producer));
1337 }
1338
1339 #[test]
1340 fn test_sse_state_schema_arc_sharing() {
1341 let producer = TestProducer {
1342 count: AtomicUsize::new(0),
1343 };
1344 let schema = serde_json::json!({
1345 "type": "object"
1346 });
1347
1348 let state1 = SseState::with_schema(producer, Some(schema)).expect("schema should be valid");
1349 let state2 = state1.clone();
1350
1351 match (&state1.event_schema, &state2.event_schema) {
1352 (Some(s1), Some(s2)) => {
1353 assert!(Arc::ptr_eq(s1, s2));
1354 }
1355 _ => panic!("Both states should have schema"),
1356 }
1357 }
1358
1359 #[test]
1360 fn test_sse_event_into_axum_event_numeric_data() {
1361 let event = SseEvent::new(serde_json::json!({
1362 "count": 42,
1363 "temperature": 98.6,
1364 "negative": -273
1365 }));
1366
1367 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1368 }
1369
1370 #[test]
1371 fn test_sse_event_json_number_precision() {
1372 let event = SseEvent::new(serde_json::json!({
1373 "float": 3.14159265359,
1374 "large_int": 9007199254740991i64
1375 }));
1376
1377 assert_eq!(event.data.get("float").and_then(|v| v.as_f64()), Some(3.14159265359));
1378
1379 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1380 }
1381
1382 #[test]
1383 fn test_sse_event_string_escaping() {
1384 let event = SseEvent::new(serde_json::json!({
1385 "escaped": "line1\nline2\ttab",
1386 "quotes": "He said \"hello\"",
1387 "backslash": "path\\to\\file"
1388 }));
1389
1390 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1391 }
1392
1393 #[test]
1394 fn test_sse_event_all_json_types_combined() {
1395 let event = SseEvent::new(serde_json::json!({
1396 "string": "text",
1397 "number": 123,
1398 "float": 1.5,
1399 "boolean": true,
1400 "null_value": null,
1401 "array": [1, 2, 3],
1402 "object": {
1403 "nested": "value"
1404 }
1405 }));
1406
1407 let _axum_event: axum::response::sse::Event = event.into_axum_event();
1408 }
1409}