1use bytes::Bytes;
8use std::future::Future;
9use std::pin::Pin;
10use tonic::metadata::MetadataMap;
11
12use super::streaming::MessageStream;
13
14#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19pub enum RpcMode {
20 Unary,
22 ServerStreaming,
24 ClientStreaming,
26 BidirectionalStreaming,
28}
29
30#[derive(Debug, Clone)]
37pub struct GrpcRequestData {
38 pub service_name: String,
40 pub method_name: String,
42 pub payload: Bytes,
44 pub metadata: MetadataMap,
46}
47
48#[derive(Debug, Clone)]
53pub struct GrpcResponseData {
54 pub payload: Bytes,
56 pub metadata: MetadataMap,
58}
59
60pub type GrpcHandlerResult = Result<GrpcResponseData, tonic::Status>;
66
67pub trait GrpcHandler: Send + Sync {
192 fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>>;
207
208 fn service_name(&self) -> &str;
223
224 fn rpc_mode(&self) -> RpcMode {
231 RpcMode::Unary
232 }
233
234 fn call_server_stream(
247 &self,
248 _request: GrpcRequestData,
249 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
250 Box::pin(async { Err(tonic::Status::unimplemented("Server streaming not supported")) })
251 }
252
253 fn call_client_stream(
258 &self,
259 _request: crate::grpc::streaming::StreamingRequest,
260 ) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send>> {
261 Box::pin(async { Err(tonic::Status::unimplemented("Client streaming not supported")) })
262 }
263
264 fn call_bidi_stream(
269 &self,
270 _request: crate::grpc::streaming::StreamingRequest,
271 ) -> Pin<Box<dyn Future<Output = Result<crate::grpc::streaming::MessageStream, tonic::Status>> + Send>> {
272 Box::pin(async { Err(tonic::Status::unimplemented("Bidirectional streaming not supported")) })
273 }
274}
275
276#[cfg(test)]
277mod tests {
278 use super::*;
279
280 struct TestGrpcHandler;
281
282 impl GrpcHandler for TestGrpcHandler {
283 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
284 Box::pin(async {
285 Ok(GrpcResponseData {
286 payload: Bytes::from("test response"),
287 metadata: MetadataMap::new(),
288 })
289 })
290 }
291
292 fn service_name(&self) -> &str {
293 "test.TestService"
294 }
295 }
296
297 #[tokio::test]
298 async fn test_grpc_handler_basic_call() {
299 let handler = TestGrpcHandler;
300 let request = GrpcRequestData {
301 service_name: "test.TestService".to_string(),
302 method_name: "TestMethod".to_string(),
303 payload: Bytes::from("test payload"),
304 metadata: MetadataMap::new(),
305 };
306
307 let result = handler.call(request).await;
308 assert!(result.is_ok());
309
310 let response = result.unwrap();
311 assert_eq!(response.payload, Bytes::from("test response"));
312 }
313
314 #[test]
315 fn test_grpc_handler_service_name() {
316 let handler = TestGrpcHandler;
317 assert_eq!(handler.service_name(), "test.TestService");
318 }
319
320 #[test]
321 fn test_grpc_handler_default_rpc_mode() {
322 let handler = TestGrpcHandler;
323 assert_eq!(handler.rpc_mode(), RpcMode::Unary);
324 }
325
326 #[tokio::test]
327 async fn test_grpc_handler_default_server_stream_unimplemented() {
328 let handler = TestGrpcHandler;
329 let request = GrpcRequestData {
330 service_name: "test.TestService".to_string(),
331 method_name: "StreamMethod".to_string(),
332 payload: Bytes::new(),
333 metadata: MetadataMap::new(),
334 };
335
336 let result = handler.call_server_stream(request).await;
337 assert!(result.is_err());
338
339 match result {
340 Err(error) => {
341 assert_eq!(error.code(), tonic::Code::Unimplemented);
342 assert_eq!(error.message(), "Server streaming not supported");
343 }
344 Ok(_) => panic!("Expected error, got Ok"),
345 }
346 }
347
348 #[test]
349 fn test_grpc_request_data_creation() {
350 let request = GrpcRequestData {
351 service_name: "mypackage.MyService".to_string(),
352 method_name: "GetUser".to_string(),
353 payload: Bytes::from("payload"),
354 metadata: MetadataMap::new(),
355 };
356
357 assert_eq!(request.service_name, "mypackage.MyService");
358 assert_eq!(request.method_name, "GetUser");
359 assert_eq!(request.payload, Bytes::from("payload"));
360 }
361
362 #[test]
363 fn test_grpc_response_data_creation() {
364 let response = GrpcResponseData {
365 payload: Bytes::from("response"),
366 metadata: MetadataMap::new(),
367 };
368
369 assert_eq!(response.payload, Bytes::from("response"));
370 assert!(response.metadata.is_empty());
371 }
372
373 #[test]
374 fn test_grpc_request_data_clone() {
375 let original = GrpcRequestData {
376 service_name: "test.Service".to_string(),
377 method_name: "Method".to_string(),
378 payload: Bytes::from("data"),
379 metadata: MetadataMap::new(),
380 };
381
382 let cloned = original.clone();
383 assert_eq!(original.service_name, cloned.service_name);
384 assert_eq!(original.method_name, cloned.method_name);
385 assert_eq!(original.payload, cloned.payload);
386 }
387
388 #[test]
389 fn test_grpc_response_data_clone() {
390 let original = GrpcResponseData {
391 payload: Bytes::from("response data"),
392 metadata: MetadataMap::new(),
393 };
394
395 let cloned = original.clone();
396 assert_eq!(original.payload, cloned.payload);
397 }
398
399 #[tokio::test]
400 async fn test_grpc_handler_error_response() {
401 struct ErrorHandler;
402
403 impl GrpcHandler for ErrorHandler {
404 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
405 Box::pin(async { Err(tonic::Status::not_found("Resource not found")) })
406 }
407
408 fn service_name(&self) -> &str {
409 "test.ErrorService"
410 }
411 }
412
413 let handler = ErrorHandler;
414 let request = GrpcRequestData {
415 service_name: "test.ErrorService".to_string(),
416 method_name: "ErrorMethod".to_string(),
417 payload: Bytes::new(),
418 metadata: MetadataMap::new(),
419 };
420
421 let result = handler.call(request).await;
422 assert!(result.is_err());
423
424 let error = result.unwrap_err();
425 assert_eq!(error.code(), tonic::Code::NotFound);
426 assert_eq!(error.message(), "Resource not found");
427 }
428
429 #[tokio::test]
432 async fn test_server_stream_with_multiple_messages() {
433 use futures_util::StreamExt;
434
435 struct ServerStreamHandler;
436
437 impl GrpcHandler for ServerStreamHandler {
438 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
439 Box::pin(async {
440 Ok(GrpcResponseData {
441 payload: Bytes::from("unary response"),
442 metadata: MetadataMap::new(),
443 })
444 })
445 }
446
447 fn service_name(&self) -> &str {
448 "test.StreamService"
449 }
450
451 fn rpc_mode(&self) -> RpcMode {
452 RpcMode::ServerStreaming
453 }
454
455 fn call_server_stream(
456 &self,
457 _request: GrpcRequestData,
458 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
459 Box::pin(async {
460 let messages = vec![
461 Bytes::from("message1"),
462 Bytes::from("message2"),
463 Bytes::from("message3"),
464 ];
465 Ok(super::super::streaming::message_stream_from_vec(messages))
466 })
467 }
468 }
469
470 let handler = ServerStreamHandler;
471 let request = GrpcRequestData {
472 service_name: "test.StreamService".to_string(),
473 method_name: "StreamMethod".to_string(),
474 payload: Bytes::from("request data"),
475 metadata: MetadataMap::new(),
476 };
477
478 let result = handler.call_server_stream(request).await;
479 assert!(result.is_ok());
480
481 let mut stream = result.unwrap();
482 let msg1 = stream.next().await.unwrap().unwrap();
483 assert_eq!(msg1, Bytes::from("message1"));
484
485 let msg2 = stream.next().await.unwrap().unwrap();
486 assert_eq!(msg2, Bytes::from("message2"));
487
488 let msg3 = stream.next().await.unwrap().unwrap();
489 assert_eq!(msg3, Bytes::from("message3"));
490
491 assert!(stream.next().await.is_none());
492 }
493
494 #[tokio::test]
495 async fn test_server_stream_empty_stream() {
496 use futures_util::StreamExt;
497
498 struct EmptyStreamHandler;
499
500 impl GrpcHandler for EmptyStreamHandler {
501 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
502 Box::pin(async {
503 Ok(GrpcResponseData {
504 payload: Bytes::new(),
505 metadata: MetadataMap::new(),
506 })
507 })
508 }
509
510 fn service_name(&self) -> &str {
511 "test.EmptyStreamService"
512 }
513
514 fn rpc_mode(&self) -> RpcMode {
515 RpcMode::ServerStreaming
516 }
517
518 fn call_server_stream(
519 &self,
520 _request: GrpcRequestData,
521 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
522 Box::pin(async { Ok(super::super::streaming::empty_message_stream()) })
523 }
524 }
525
526 let handler = EmptyStreamHandler;
527 let request = GrpcRequestData {
528 service_name: "test.EmptyStreamService".to_string(),
529 method_name: "EmptyStream".to_string(),
530 payload: Bytes::new(),
531 metadata: MetadataMap::new(),
532 };
533
534 let result = handler.call_server_stream(request).await;
535 assert!(result.is_ok());
536
537 let mut stream = result.unwrap();
538 assert!(stream.next().await.is_none());
539 }
540
541 #[tokio::test]
542 async fn test_server_stream_with_error_mid_stream() {
543 use futures_util::StreamExt;
544
545 struct ErrorMidStreamHandler;
546
547 impl GrpcHandler for ErrorMidStreamHandler {
548 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
549 Box::pin(async {
550 Ok(GrpcResponseData {
551 payload: Bytes::new(),
552 metadata: MetadataMap::new(),
553 })
554 })
555 }
556
557 fn service_name(&self) -> &str {
558 "test.ErrorMidStreamService"
559 }
560
561 fn rpc_mode(&self) -> RpcMode {
562 RpcMode::ServerStreaming
563 }
564
565 fn call_server_stream(
566 &self,
567 _request: GrpcRequestData,
568 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
569 Box::pin(async {
570 let messages = vec![
571 Ok(Bytes::from("message1")),
572 Ok(Bytes::from("message2")),
573 Err(tonic::Status::internal("Stream error")),
574 ];
575 let stream: MessageStream = Box::pin(futures_util::stream::iter(messages));
576 Ok(stream)
577 })
578 }
579 }
580
581 let handler = ErrorMidStreamHandler;
582 let request = GrpcRequestData {
583 service_name: "test.ErrorMidStreamService".to_string(),
584 method_name: "ErrorStream".to_string(),
585 payload: Bytes::new(),
586 metadata: MetadataMap::new(),
587 };
588
589 let result = handler.call_server_stream(request).await;
590 assert!(result.is_ok());
591
592 let mut stream = result.unwrap();
593
594 let msg1 = stream.next().await.unwrap().unwrap();
595 assert_eq!(msg1, Bytes::from("message1"));
596
597 let msg2 = stream.next().await.unwrap().unwrap();
598 assert_eq!(msg2, Bytes::from("message2"));
599
600 let error_result = stream.next().await.unwrap();
601 assert!(error_result.is_err());
602 let error = error_result.unwrap_err();
603 assert_eq!(error.code(), tonic::Code::Internal);
604 assert_eq!(error.message(), "Stream error");
605
606 assert!(stream.next().await.is_none());
607 }
608
609 #[tokio::test]
610 async fn test_server_stream_returns_error() {
611 struct FailingStreamHandler;
612
613 impl GrpcHandler for FailingStreamHandler {
614 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
615 Box::pin(async {
616 Ok(GrpcResponseData {
617 payload: Bytes::new(),
618 metadata: MetadataMap::new(),
619 })
620 })
621 }
622
623 fn service_name(&self) -> &str {
624 "test.FailingStreamService"
625 }
626
627 fn rpc_mode(&self) -> RpcMode {
628 RpcMode::ServerStreaming
629 }
630
631 fn call_server_stream(
632 &self,
633 _request: GrpcRequestData,
634 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
635 Box::pin(async { Err(tonic::Status::unavailable("Stream unavailable")) })
636 }
637 }
638
639 let handler = FailingStreamHandler;
640 let request = GrpcRequestData {
641 service_name: "test.FailingStreamService".to_string(),
642 method_name: "FailingStream".to_string(),
643 payload: Bytes::new(),
644 metadata: MetadataMap::new(),
645 };
646
647 let result = handler.call_server_stream(request).await;
648 assert!(result.is_err());
649
650 if let Err(error) = result {
651 assert_eq!(error.code(), tonic::Code::Unavailable);
652 assert_eq!(error.message(), "Stream unavailable");
653 } else {
654 panic!("Expected error");
655 }
656 }
657
658 #[tokio::test]
659 async fn test_server_stream_with_metadata() {
660 use futures_util::StreamExt;
661
662 struct MetadataStreamHandler;
663
664 impl GrpcHandler for MetadataStreamHandler {
665 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
666 Box::pin(async {
667 Ok(GrpcResponseData {
668 payload: Bytes::new(),
669 metadata: MetadataMap::new(),
670 })
671 })
672 }
673
674 fn service_name(&self) -> &str {
675 "test.MetadataStreamService"
676 }
677
678 fn rpc_mode(&self) -> RpcMode {
679 RpcMode::ServerStreaming
680 }
681
682 fn call_server_stream(
683 &self,
684 request: GrpcRequestData,
685 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
686 Box::pin(async move {
687 assert!(!request.metadata.is_empty());
689 let messages = vec![Bytes::from("metadata_message")];
690 Ok(super::super::streaming::message_stream_from_vec(messages))
691 })
692 }
693 }
694
695 let handler = MetadataStreamHandler;
696 let mut metadata = MetadataMap::new();
697 metadata.insert(
698 "x-request-id",
699 "test-request-123"
700 .parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
701 .unwrap(),
702 );
703
704 let request = GrpcRequestData {
705 service_name: "test.MetadataStreamService".to_string(),
706 method_name: "MetadataStream".to_string(),
707 payload: Bytes::new(),
708 metadata,
709 };
710
711 let result = handler.call_server_stream(request).await;
712 assert!(result.is_ok());
713
714 let mut stream = result.unwrap();
715 let msg = stream.next().await.unwrap().unwrap();
716 assert_eq!(msg, Bytes::from("metadata_message"));
717 }
718
719 #[tokio::test]
720 async fn test_server_stream_large_stream_100_messages() {
721 use futures_util::StreamExt;
722
723 struct LargeStreamHandler;
724
725 impl GrpcHandler for LargeStreamHandler {
726 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
727 Box::pin(async {
728 Ok(GrpcResponseData {
729 payload: Bytes::new(),
730 metadata: MetadataMap::new(),
731 })
732 })
733 }
734
735 fn service_name(&self) -> &str {
736 "test.LargeStreamService"
737 }
738
739 fn rpc_mode(&self) -> RpcMode {
740 RpcMode::ServerStreaming
741 }
742
743 fn call_server_stream(
744 &self,
745 _request: GrpcRequestData,
746 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
747 Box::pin(async {
748 let mut messages = Vec::new();
749 for i in 0..100 {
750 messages.push(Bytes::from(format!("message_{}", i)));
751 }
752 Ok(super::super::streaming::message_stream_from_vec(messages))
753 })
754 }
755 }
756
757 let handler = LargeStreamHandler;
758 let request = GrpcRequestData {
759 service_name: "test.LargeStreamService".to_string(),
760 method_name: "LargeStream".to_string(),
761 payload: Bytes::new(),
762 metadata: MetadataMap::new(),
763 };
764
765 let result = handler.call_server_stream(request).await;
766 assert!(result.is_ok());
767
768 let mut stream = result.unwrap();
769 for i in 0..100 {
770 let msg = stream.next().await.unwrap().unwrap();
771 assert_eq!(msg, Bytes::from(format!("message_{}", i)));
772 }
773
774 assert!(stream.next().await.is_none());
775 }
776
777 #[tokio::test]
778 async fn test_server_stream_large_stream_500_messages() {
779 use futures_util::StreamExt;
780
781 struct VeryLargeStreamHandler;
782
783 impl GrpcHandler for VeryLargeStreamHandler {
784 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
785 Box::pin(async {
786 Ok(GrpcResponseData {
787 payload: Bytes::new(),
788 metadata: MetadataMap::new(),
789 })
790 })
791 }
792
793 fn service_name(&self) -> &str {
794 "test.VeryLargeStreamService"
795 }
796
797 fn rpc_mode(&self) -> RpcMode {
798 RpcMode::ServerStreaming
799 }
800
801 fn call_server_stream(
802 &self,
803 _request: GrpcRequestData,
804 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
805 Box::pin(async {
806 let mut messages = Vec::new();
807 for i in 0..500 {
808 messages.push(Bytes::from(format!("msg_{}", i)));
809 }
810 Ok(super::super::streaming::message_stream_from_vec(messages))
811 })
812 }
813 }
814
815 let handler = VeryLargeStreamHandler;
816 let request = GrpcRequestData {
817 service_name: "test.VeryLargeStreamService".to_string(),
818 method_name: "VeryLargeStream".to_string(),
819 payload: Bytes::new(),
820 metadata: MetadataMap::new(),
821 };
822
823 let result = handler.call_server_stream(request).await;
824 assert!(result.is_ok());
825
826 let mut stream = result.unwrap();
827 let mut count = 0;
828 while let Some(item) = stream.next().await {
829 let msg = item.unwrap();
830 assert_eq!(msg, Bytes::from(format!("msg_{}", count)));
831 count += 1;
832 }
833 assert_eq!(count, 500);
834 }
835
836 #[test]
837 fn test_rpc_mode_unary() {
838 let handler = TestGrpcHandler;
839 assert_eq!(handler.rpc_mode(), RpcMode::Unary);
840 }
841
842 #[test]
843 fn test_rpc_mode_server_streaming() {
844 struct ServerStreamTestHandler;
845
846 impl GrpcHandler for ServerStreamTestHandler {
847 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
848 Box::pin(async {
849 Ok(GrpcResponseData {
850 payload: Bytes::new(),
851 metadata: MetadataMap::new(),
852 })
853 })
854 }
855
856 fn service_name(&self) -> &str {
857 "test.ServerStreamTestService"
858 }
859
860 fn rpc_mode(&self) -> RpcMode {
861 RpcMode::ServerStreaming
862 }
863 }
864
865 let handler = ServerStreamTestHandler;
866 assert_eq!(handler.rpc_mode(), RpcMode::ServerStreaming);
867 }
868
869 #[test]
870 fn test_rpc_mode_client_streaming() {
871 struct ClientStreamTestHandler;
872
873 impl GrpcHandler for ClientStreamTestHandler {
874 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
875 Box::pin(async {
876 Ok(GrpcResponseData {
877 payload: Bytes::new(),
878 metadata: MetadataMap::new(),
879 })
880 })
881 }
882
883 fn service_name(&self) -> &str {
884 "test.ClientStreamTestService"
885 }
886
887 fn rpc_mode(&self) -> RpcMode {
888 RpcMode::ClientStreaming
889 }
890 }
891
892 let handler = ClientStreamTestHandler;
893 assert_eq!(handler.rpc_mode(), RpcMode::ClientStreaming);
894 }
895
896 #[test]
897 fn test_rpc_mode_bidirectional_streaming() {
898 struct BiDirectionalStreamTestHandler;
899
900 impl GrpcHandler for BiDirectionalStreamTestHandler {
901 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
902 Box::pin(async {
903 Ok(GrpcResponseData {
904 payload: Bytes::new(),
905 metadata: MetadataMap::new(),
906 })
907 })
908 }
909
910 fn service_name(&self) -> &str {
911 "test.BiDirectionalStreamTestService"
912 }
913
914 fn rpc_mode(&self) -> RpcMode {
915 RpcMode::BidirectionalStreaming
916 }
917 }
918
919 let handler = BiDirectionalStreamTestHandler;
920 assert_eq!(handler.rpc_mode(), RpcMode::BidirectionalStreaming);
921 }
922
923 #[tokio::test]
924 async fn test_server_stream_single_message() {
925 use futures_util::StreamExt;
926
927 struct SingleMessageStreamHandler;
928
929 impl GrpcHandler for SingleMessageStreamHandler {
930 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
931 Box::pin(async {
932 Ok(GrpcResponseData {
933 payload: Bytes::new(),
934 metadata: MetadataMap::new(),
935 })
936 })
937 }
938
939 fn service_name(&self) -> &str {
940 "test.SingleMessageStreamService"
941 }
942
943 fn rpc_mode(&self) -> RpcMode {
944 RpcMode::ServerStreaming
945 }
946
947 fn call_server_stream(
948 &self,
949 _request: GrpcRequestData,
950 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
951 Box::pin(async {
952 Ok(super::super::streaming::single_message_stream(Bytes::from(
953 "single_msg",
954 )))
955 })
956 }
957 }
958
959 let handler = SingleMessageStreamHandler;
960 let request = GrpcRequestData {
961 service_name: "test.SingleMessageStreamService".to_string(),
962 method_name: "SingleMessage".to_string(),
963 payload: Bytes::new(),
964 metadata: MetadataMap::new(),
965 };
966
967 let result = handler.call_server_stream(request).await;
968 assert!(result.is_ok());
969
970 let mut stream = result.unwrap();
971 let msg = stream.next().await.unwrap().unwrap();
972 assert_eq!(msg, Bytes::from("single_msg"));
973 assert!(stream.next().await.is_none());
974 }
975
976 #[tokio::test]
977 async fn test_server_stream_preserves_request_data() {
978 use futures_util::StreamExt;
979
980 struct RequestPreservingStreamHandler;
981
982 impl GrpcHandler for RequestPreservingStreamHandler {
983 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
984 Box::pin(async {
985 Ok(GrpcResponseData {
986 payload: Bytes::new(),
987 metadata: MetadataMap::new(),
988 })
989 })
990 }
991
992 fn service_name(&self) -> &str {
993 "test.RequestPreservingService"
994 }
995
996 fn rpc_mode(&self) -> RpcMode {
997 RpcMode::ServerStreaming
998 }
999
1000 fn call_server_stream(
1001 &self,
1002 request: GrpcRequestData,
1003 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1004 Box::pin(async move {
1005 assert_eq!(request.service_name, "test.RequestPreservingService");
1007 assert_eq!(request.method_name, "PreserveTest");
1008 assert_eq!(request.payload, Bytes::from("test_payload"));
1009
1010 let messages = vec![Bytes::from("response")];
1011 Ok(super::super::streaming::message_stream_from_vec(messages))
1012 })
1013 }
1014 }
1015
1016 let handler = RequestPreservingStreamHandler;
1017 let request = GrpcRequestData {
1018 service_name: "test.RequestPreservingService".to_string(),
1019 method_name: "PreserveTest".to_string(),
1020 payload: Bytes::from("test_payload"),
1021 metadata: MetadataMap::new(),
1022 };
1023
1024 let result = handler.call_server_stream(request).await;
1025 assert!(result.is_ok());
1026
1027 let mut stream = result.unwrap();
1028 let msg = stream.next().await.unwrap().unwrap();
1029 assert_eq!(msg, Bytes::from("response"));
1030 }
1031
1032 #[tokio::test]
1033 async fn test_server_stream_with_various_error_codes() {
1034 struct ErrorCodeStreamHandler {
1035 error_code: tonic::Code,
1036 }
1037
1038 impl GrpcHandler for ErrorCodeStreamHandler {
1039 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
1040 Box::pin(async {
1041 Ok(GrpcResponseData {
1042 payload: Bytes::new(),
1043 metadata: MetadataMap::new(),
1044 })
1045 })
1046 }
1047
1048 fn service_name(&self) -> &str {
1049 "test.ErrorCodeService"
1050 }
1051
1052 fn rpc_mode(&self) -> RpcMode {
1053 RpcMode::ServerStreaming
1054 }
1055
1056 fn call_server_stream(
1057 &self,
1058 _request: GrpcRequestData,
1059 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1060 let code = self.error_code;
1061 Box::pin(async move {
1062 match code {
1063 tonic::Code::InvalidArgument => Err(tonic::Status::invalid_argument("Invalid argument")),
1064 tonic::Code::FailedPrecondition => {
1065 Err(tonic::Status::failed_precondition("Failed precondition"))
1066 }
1067 tonic::Code::PermissionDenied => Err(tonic::Status::permission_denied("Permission denied")),
1068 _ => Err(tonic::Status::internal("Internal error")),
1069 }
1070 })
1071 }
1072 }
1073
1074 let handler = ErrorCodeStreamHandler {
1076 error_code: tonic::Code::InvalidArgument,
1077 };
1078 let request = GrpcRequestData {
1079 service_name: "test.ErrorCodeService".to_string(),
1080 method_name: "Error".to_string(),
1081 payload: Bytes::new(),
1082 metadata: MetadataMap::new(),
1083 };
1084 let result = handler.call_server_stream(request).await;
1085 assert!(result.is_err());
1086 if let Err(error) = result {
1087 assert_eq!(error.code(), tonic::Code::InvalidArgument);
1088 }
1089
1090 let handler = ErrorCodeStreamHandler {
1092 error_code: tonic::Code::FailedPrecondition,
1093 };
1094 let request = GrpcRequestData {
1095 service_name: "test.ErrorCodeService".to_string(),
1096 method_name: "Error".to_string(),
1097 payload: Bytes::new(),
1098 metadata: MetadataMap::new(),
1099 };
1100 let result = handler.call_server_stream(request).await;
1101 assert!(result.is_err());
1102 if let Err(error) = result {
1103 assert_eq!(error.code(), tonic::Code::FailedPrecondition);
1104 }
1105
1106 let handler = ErrorCodeStreamHandler {
1108 error_code: tonic::Code::PermissionDenied,
1109 };
1110 let request = GrpcRequestData {
1111 service_name: "test.ErrorCodeService".to_string(),
1112 method_name: "Error".to_string(),
1113 payload: Bytes::new(),
1114 metadata: MetadataMap::new(),
1115 };
1116 let result = handler.call_server_stream(request).await;
1117 assert!(result.is_err());
1118 if let Err(error) = result {
1119 assert_eq!(error.code(), tonic::Code::PermissionDenied);
1120 }
1121 }
1122}