1use bytes::Bytes;
8use futures_util::StreamExt;
9use std::future::Future;
10use std::pin::Pin;
11use tonic::metadata::MetadataMap;
12
13use super::streaming::MessageStream;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum RpcMode {
21 Unary,
23 ServerStreaming,
25 ClientStreaming,
27 BidirectionalStreaming,
29}
30
31#[derive(Debug, Clone)]
38pub struct GrpcRequestData {
39 pub service_name: String,
41 pub method_name: String,
43 pub payload: Bytes,
45 pub metadata: MetadataMap,
47}
48
49#[derive(Debug, Clone)]
54pub struct GrpcResponseData {
55 pub payload: Bytes,
57 pub metadata: MetadataMap,
59}
60
61pub type GrpcHandlerResult = Result<GrpcResponseData, tonic::Status>;
67
68pub trait GrpcHandler: Send + Sync {
193 fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send + '_>>;
208
209 fn service_name(&self) -> &str;
224
225 fn rpc_mode(&self) -> RpcMode {
232 RpcMode::Unary
233 }
234
235 fn call_server_stream(
249 &self,
250 request: GrpcRequestData,
251 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send + '_>> {
252 let unary_future = self.call(request);
253 Box::pin(async move {
254 let response = unary_future.await?;
255 Ok(crate::grpc::streaming::single_message_stream(response.payload))
256 })
257 }
258
259 fn call_client_stream(
265 &self,
266 request: crate::grpc::streaming::StreamingRequest,
267 ) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send + '_>> {
268 Box::pin(async move {
269 let crate::grpc::streaming::StreamingRequest {
270 service_name,
271 method_name,
272 mut message_stream,
273 metadata,
274 } = request;
275
276 let first_message = match message_stream.next().await {
277 Some(Ok(message)) => message,
278 Some(Err(status)) => return Err(status),
279 None => {
280 return Err(tonic::Status::invalid_argument(
281 "Client stream is empty; unary fallback requires exactly one request message",
282 ));
283 }
284 };
285
286 if let Some(next_message) = message_stream.next().await {
287 match next_message {
288 Ok(_) => {
289 return Err(tonic::Status::invalid_argument(
290 "Unary fallback requires exactly one request message",
291 ));
292 }
293 Err(status) => return Err(status),
294 }
295 }
296
297 self.call(GrpcRequestData {
298 service_name,
299 method_name,
300 payload: first_message,
301 metadata,
302 })
303 .await
304 })
305 }
306
307 fn call_bidi_stream(
313 &self,
314 request: crate::grpc::streaming::StreamingRequest,
315 ) -> Pin<Box<dyn Future<Output = Result<crate::grpc::streaming::MessageStream, tonic::Status>> + Send + '_>> {
316 Box::pin(async move {
317 let response = self.call_client_stream(request).await?;
318 Ok(crate::grpc::streaming::single_message_stream(response.payload))
319 })
320 }
321}
322
323#[cfg(test)]
324mod tests {
325 use super::*;
326
327 struct TestGrpcHandler;
328
329 impl GrpcHandler for TestGrpcHandler {
330 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
331 Box::pin(async {
332 Ok(GrpcResponseData {
333 payload: Bytes::from("test response"),
334 metadata: MetadataMap::new(),
335 })
336 })
337 }
338
339 fn service_name(&self) -> &str {
340 "test.TestService"
341 }
342 }
343
344 #[tokio::test]
345 async fn test_grpc_handler_basic_call() {
346 let handler = TestGrpcHandler;
347 let request = GrpcRequestData {
348 service_name: "test.TestService".to_string(),
349 method_name: "TestMethod".to_string(),
350 payload: Bytes::from("test payload"),
351 metadata: MetadataMap::new(),
352 };
353
354 let result = handler.call(request).await;
355 assert!(result.is_ok());
356
357 let response = result.unwrap();
358 assert_eq!(response.payload, Bytes::from("test response"));
359 }
360
361 #[test]
362 fn test_grpc_handler_service_name() {
363 let handler = TestGrpcHandler;
364 assert_eq!(handler.service_name(), "test.TestService");
365 }
366
367 #[test]
368 fn test_grpc_handler_default_rpc_mode() {
369 let handler = TestGrpcHandler;
370 assert_eq!(handler.rpc_mode(), RpcMode::Unary);
371 }
372
373 #[tokio::test]
374 async fn test_grpc_handler_default_server_stream_falls_back_to_unary() {
375 let handler = TestGrpcHandler;
376 let request = GrpcRequestData {
377 service_name: "test.TestService".to_string(),
378 method_name: "StreamMethod".to_string(),
379 payload: Bytes::new(),
380 metadata: MetadataMap::new(),
381 };
382
383 let result = handler.call_server_stream(request).await.unwrap();
384 let collected: Vec<_> = result.collect().await;
385 assert_eq!(collected.len(), 1);
386 assert_eq!(collected[0].as_ref().unwrap(), &Bytes::from("test response"));
387 }
388
389 #[tokio::test]
390 async fn test_grpc_handler_default_client_stream_falls_back_to_unary() {
391 let handler = TestGrpcHandler;
392 let request = crate::grpc::streaming::StreamingRequest {
393 service_name: "test.TestService".to_string(),
394 method_name: "ClientStreamMethod".to_string(),
395 message_stream: crate::grpc::streaming::message_stream_from_vec(vec![Bytes::from("one")]),
396 metadata: MetadataMap::new(),
397 };
398
399 let result = handler.call_client_stream(request).await.unwrap();
400 assert_eq!(result.payload, Bytes::from("test response"));
401 }
402
403 #[tokio::test]
404 async fn test_grpc_handler_default_client_stream_requires_single_message() {
405 let handler = TestGrpcHandler;
406 let request = crate::grpc::streaming::StreamingRequest {
407 service_name: "test.TestService".to_string(),
408 method_name: "ClientStreamMethod".to_string(),
409 message_stream: crate::grpc::streaming::message_stream_from_vec(vec![
410 Bytes::from("one"),
411 Bytes::from("two"),
412 ]),
413 metadata: MetadataMap::new(),
414 };
415
416 let error = handler.call_client_stream(request).await.unwrap_err();
417 assert_eq!(error.code(), tonic::Code::InvalidArgument);
418 assert!(error.message().contains("exactly one"));
419 }
420
421 #[tokio::test]
422 async fn test_grpc_handler_default_bidi_stream_falls_back_to_unary() {
423 let handler = TestGrpcHandler;
424 let request = crate::grpc::streaming::StreamingRequest {
425 service_name: "test.TestService".to_string(),
426 method_name: "BidiMethod".to_string(),
427 message_stream: crate::grpc::streaming::message_stream_from_vec(vec![Bytes::from("ping")]),
428 metadata: MetadataMap::new(),
429 };
430
431 let stream = handler.call_bidi_stream(request).await.unwrap();
432 let collected: Vec<_> = stream.collect().await;
433 assert_eq!(collected.len(), 1);
434 assert_eq!(collected[0].as_ref().unwrap(), &Bytes::from("test response"));
435 }
436
437 #[test]
438 fn test_grpc_request_data_creation() {
439 let request = GrpcRequestData {
440 service_name: "mypackage.MyService".to_string(),
441 method_name: "GetUser".to_string(),
442 payload: Bytes::from("payload"),
443 metadata: MetadataMap::new(),
444 };
445
446 assert_eq!(request.service_name, "mypackage.MyService");
447 assert_eq!(request.method_name, "GetUser");
448 assert_eq!(request.payload, Bytes::from("payload"));
449 }
450
451 #[test]
452 fn test_grpc_response_data_creation() {
453 let response = GrpcResponseData {
454 payload: Bytes::from("response"),
455 metadata: MetadataMap::new(),
456 };
457
458 assert_eq!(response.payload, Bytes::from("response"));
459 assert!(response.metadata.is_empty());
460 }
461
462 #[test]
463 fn test_grpc_request_data_clone() {
464 let original = GrpcRequestData {
465 service_name: "test.Service".to_string(),
466 method_name: "Method".to_string(),
467 payload: Bytes::from("data"),
468 metadata: MetadataMap::new(),
469 };
470
471 let cloned = original.clone();
472 assert_eq!(original.service_name, cloned.service_name);
473 assert_eq!(original.method_name, cloned.method_name);
474 assert_eq!(original.payload, cloned.payload);
475 }
476
477 #[test]
478 fn test_grpc_response_data_clone() {
479 let original = GrpcResponseData {
480 payload: Bytes::from("response data"),
481 metadata: MetadataMap::new(),
482 };
483
484 let cloned = original.clone();
485 assert_eq!(original.payload, cloned.payload);
486 }
487
488 #[tokio::test]
489 async fn test_grpc_handler_error_response() {
490 struct ErrorHandler;
491
492 impl GrpcHandler for ErrorHandler {
493 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
494 Box::pin(async { Err(tonic::Status::not_found("Resource not found")) })
495 }
496
497 fn service_name(&self) -> &str {
498 "test.ErrorService"
499 }
500 }
501
502 let handler = ErrorHandler;
503 let request = GrpcRequestData {
504 service_name: "test.ErrorService".to_string(),
505 method_name: "ErrorMethod".to_string(),
506 payload: Bytes::new(),
507 metadata: MetadataMap::new(),
508 };
509
510 let result = handler.call(request).await;
511 assert!(result.is_err());
512
513 let error = result.unwrap_err();
514 assert_eq!(error.code(), tonic::Code::NotFound);
515 assert_eq!(error.message(), "Resource not found");
516 }
517
518 #[tokio::test]
521 async fn test_server_stream_with_multiple_messages() {
522 use futures_util::StreamExt;
523
524 struct ServerStreamHandler;
525
526 impl GrpcHandler for ServerStreamHandler {
527 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
528 Box::pin(async {
529 Ok(GrpcResponseData {
530 payload: Bytes::from("unary response"),
531 metadata: MetadataMap::new(),
532 })
533 })
534 }
535
536 fn service_name(&self) -> &str {
537 "test.StreamService"
538 }
539
540 fn rpc_mode(&self) -> RpcMode {
541 RpcMode::ServerStreaming
542 }
543
544 fn call_server_stream(
545 &self,
546 _request: GrpcRequestData,
547 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
548 Box::pin(async {
549 let messages = vec![
550 Bytes::from("message1"),
551 Bytes::from("message2"),
552 Bytes::from("message3"),
553 ];
554 Ok(super::super::streaming::message_stream_from_vec(messages))
555 })
556 }
557 }
558
559 let handler = ServerStreamHandler;
560 let request = GrpcRequestData {
561 service_name: "test.StreamService".to_string(),
562 method_name: "StreamMethod".to_string(),
563 payload: Bytes::from("request data"),
564 metadata: MetadataMap::new(),
565 };
566
567 let result = handler.call_server_stream(request).await;
568 assert!(result.is_ok());
569
570 let mut stream = result.unwrap();
571 let msg1 = stream.next().await.unwrap().unwrap();
572 assert_eq!(msg1, Bytes::from("message1"));
573
574 let msg2 = stream.next().await.unwrap().unwrap();
575 assert_eq!(msg2, Bytes::from("message2"));
576
577 let msg3 = stream.next().await.unwrap().unwrap();
578 assert_eq!(msg3, Bytes::from("message3"));
579
580 assert!(stream.next().await.is_none());
581 }
582
583 #[tokio::test]
584 async fn test_server_stream_empty_stream() {
585 use futures_util::StreamExt;
586
587 struct EmptyStreamHandler;
588
589 impl GrpcHandler for EmptyStreamHandler {
590 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
591 Box::pin(async {
592 Ok(GrpcResponseData {
593 payload: Bytes::new(),
594 metadata: MetadataMap::new(),
595 })
596 })
597 }
598
599 fn service_name(&self) -> &str {
600 "test.EmptyStreamService"
601 }
602
603 fn rpc_mode(&self) -> RpcMode {
604 RpcMode::ServerStreaming
605 }
606
607 fn call_server_stream(
608 &self,
609 _request: GrpcRequestData,
610 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
611 Box::pin(async { Ok(super::super::streaming::empty_message_stream()) })
612 }
613 }
614
615 let handler = EmptyStreamHandler;
616 let request = GrpcRequestData {
617 service_name: "test.EmptyStreamService".to_string(),
618 method_name: "EmptyStream".to_string(),
619 payload: Bytes::new(),
620 metadata: MetadataMap::new(),
621 };
622
623 let result = handler.call_server_stream(request).await;
624 assert!(result.is_ok());
625
626 let mut stream = result.unwrap();
627 assert!(stream.next().await.is_none());
628 }
629
630 #[tokio::test]
631 async fn test_server_stream_with_error_mid_stream() {
632 use futures_util::StreamExt;
633
634 struct ErrorMidStreamHandler;
635
636 impl GrpcHandler for ErrorMidStreamHandler {
637 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
638 Box::pin(async {
639 Ok(GrpcResponseData {
640 payload: Bytes::new(),
641 metadata: MetadataMap::new(),
642 })
643 })
644 }
645
646 fn service_name(&self) -> &str {
647 "test.ErrorMidStreamService"
648 }
649
650 fn rpc_mode(&self) -> RpcMode {
651 RpcMode::ServerStreaming
652 }
653
654 fn call_server_stream(
655 &self,
656 _request: GrpcRequestData,
657 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
658 Box::pin(async {
659 let messages = vec![
660 Ok(Bytes::from("message1")),
661 Ok(Bytes::from("message2")),
662 Err(tonic::Status::internal("Stream error")),
663 ];
664 let stream: MessageStream = Box::pin(futures_util::stream::iter(messages));
665 Ok(stream)
666 })
667 }
668 }
669
670 let handler = ErrorMidStreamHandler;
671 let request = GrpcRequestData {
672 service_name: "test.ErrorMidStreamService".to_string(),
673 method_name: "ErrorStream".to_string(),
674 payload: Bytes::new(),
675 metadata: MetadataMap::new(),
676 };
677
678 let result = handler.call_server_stream(request).await;
679 assert!(result.is_ok());
680
681 let mut stream = result.unwrap();
682
683 let msg1 = stream.next().await.unwrap().unwrap();
684 assert_eq!(msg1, Bytes::from("message1"));
685
686 let msg2 = stream.next().await.unwrap().unwrap();
687 assert_eq!(msg2, Bytes::from("message2"));
688
689 let error_result = stream.next().await.unwrap();
690 assert!(error_result.is_err());
691 let error = error_result.unwrap_err();
692 assert_eq!(error.code(), tonic::Code::Internal);
693 assert_eq!(error.message(), "Stream error");
694
695 assert!(stream.next().await.is_none());
696 }
697
698 #[tokio::test]
699 async fn test_server_stream_returns_error() {
700 struct FailingStreamHandler;
701
702 impl GrpcHandler for FailingStreamHandler {
703 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
704 Box::pin(async {
705 Ok(GrpcResponseData {
706 payload: Bytes::new(),
707 metadata: MetadataMap::new(),
708 })
709 })
710 }
711
712 fn service_name(&self) -> &str {
713 "test.FailingStreamService"
714 }
715
716 fn rpc_mode(&self) -> RpcMode {
717 RpcMode::ServerStreaming
718 }
719
720 fn call_server_stream(
721 &self,
722 _request: GrpcRequestData,
723 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
724 Box::pin(async { Err(tonic::Status::unavailable("Stream unavailable")) })
725 }
726 }
727
728 let handler = FailingStreamHandler;
729 let request = GrpcRequestData {
730 service_name: "test.FailingStreamService".to_string(),
731 method_name: "FailingStream".to_string(),
732 payload: Bytes::new(),
733 metadata: MetadataMap::new(),
734 };
735
736 let result = handler.call_server_stream(request).await;
737 assert!(result.is_err());
738
739 if let Err(error) = result {
740 assert_eq!(error.code(), tonic::Code::Unavailable);
741 assert_eq!(error.message(), "Stream unavailable");
742 } else {
743 panic!("Expected error");
744 }
745 }
746
747 #[tokio::test]
748 async fn test_server_stream_with_metadata() {
749 use futures_util::StreamExt;
750
751 struct MetadataStreamHandler;
752
753 impl GrpcHandler for MetadataStreamHandler {
754 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
755 Box::pin(async {
756 Ok(GrpcResponseData {
757 payload: Bytes::new(),
758 metadata: MetadataMap::new(),
759 })
760 })
761 }
762
763 fn service_name(&self) -> &str {
764 "test.MetadataStreamService"
765 }
766
767 fn rpc_mode(&self) -> RpcMode {
768 RpcMode::ServerStreaming
769 }
770
771 fn call_server_stream(
772 &self,
773 request: GrpcRequestData,
774 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
775 Box::pin(async move {
776 assert!(!request.metadata.is_empty());
778 let messages = vec![Bytes::from("metadata_message")];
779 Ok(super::super::streaming::message_stream_from_vec(messages))
780 })
781 }
782 }
783
784 let handler = MetadataStreamHandler;
785 let mut metadata = MetadataMap::new();
786 metadata.insert(
787 "x-request-id",
788 "test-request-123"
789 .parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
790 .unwrap(),
791 );
792
793 let request = GrpcRequestData {
794 service_name: "test.MetadataStreamService".to_string(),
795 method_name: "MetadataStream".to_string(),
796 payload: Bytes::new(),
797 metadata,
798 };
799
800 let result = handler.call_server_stream(request).await;
801 assert!(result.is_ok());
802
803 let mut stream = result.unwrap();
804 let msg = stream.next().await.unwrap().unwrap();
805 assert_eq!(msg, Bytes::from("metadata_message"));
806 }
807
808 #[tokio::test]
809 async fn test_server_stream_large_stream_100_messages() {
810 use futures_util::StreamExt;
811
812 struct LargeStreamHandler;
813
814 impl GrpcHandler for LargeStreamHandler {
815 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
816 Box::pin(async {
817 Ok(GrpcResponseData {
818 payload: Bytes::new(),
819 metadata: MetadataMap::new(),
820 })
821 })
822 }
823
824 fn service_name(&self) -> &str {
825 "test.LargeStreamService"
826 }
827
828 fn rpc_mode(&self) -> RpcMode {
829 RpcMode::ServerStreaming
830 }
831
832 fn call_server_stream(
833 &self,
834 _request: GrpcRequestData,
835 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
836 Box::pin(async {
837 let mut messages = Vec::new();
838 for i in 0..100 {
839 messages.push(Bytes::from(format!("message_{}", i)));
840 }
841 Ok(super::super::streaming::message_stream_from_vec(messages))
842 })
843 }
844 }
845
846 let handler = LargeStreamHandler;
847 let request = GrpcRequestData {
848 service_name: "test.LargeStreamService".to_string(),
849 method_name: "LargeStream".to_string(),
850 payload: Bytes::new(),
851 metadata: MetadataMap::new(),
852 };
853
854 let result = handler.call_server_stream(request).await;
855 assert!(result.is_ok());
856
857 let mut stream = result.unwrap();
858 for i in 0..100 {
859 let msg = stream.next().await.unwrap().unwrap();
860 assert_eq!(msg, Bytes::from(format!("message_{}", i)));
861 }
862
863 assert!(stream.next().await.is_none());
864 }
865
866 #[tokio::test]
867 async fn test_server_stream_large_stream_500_messages() {
868 use futures_util::StreamExt;
869
870 struct VeryLargeStreamHandler;
871
872 impl GrpcHandler for VeryLargeStreamHandler {
873 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
874 Box::pin(async {
875 Ok(GrpcResponseData {
876 payload: Bytes::new(),
877 metadata: MetadataMap::new(),
878 })
879 })
880 }
881
882 fn service_name(&self) -> &str {
883 "test.VeryLargeStreamService"
884 }
885
886 fn rpc_mode(&self) -> RpcMode {
887 RpcMode::ServerStreaming
888 }
889
890 fn call_server_stream(
891 &self,
892 _request: GrpcRequestData,
893 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
894 Box::pin(async {
895 let mut messages = Vec::new();
896 for i in 0..500 {
897 messages.push(Bytes::from(format!("msg_{}", i)));
898 }
899 Ok(super::super::streaming::message_stream_from_vec(messages))
900 })
901 }
902 }
903
904 let handler = VeryLargeStreamHandler;
905 let request = GrpcRequestData {
906 service_name: "test.VeryLargeStreamService".to_string(),
907 method_name: "VeryLargeStream".to_string(),
908 payload: Bytes::new(),
909 metadata: MetadataMap::new(),
910 };
911
912 let result = handler.call_server_stream(request).await;
913 assert!(result.is_ok());
914
915 let mut stream = result.unwrap();
916 let mut count = 0;
917 while let Some(item) = stream.next().await {
918 let msg = item.unwrap();
919 assert_eq!(msg, Bytes::from(format!("msg_{}", count)));
920 count += 1;
921 }
922 assert_eq!(count, 500);
923 }
924
925 #[test]
926 fn test_rpc_mode_unary() {
927 let handler = TestGrpcHandler;
928 assert_eq!(handler.rpc_mode(), RpcMode::Unary);
929 }
930
931 #[test]
932 fn test_rpc_mode_server_streaming() {
933 struct ServerStreamTestHandler;
934
935 impl GrpcHandler for ServerStreamTestHandler {
936 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
937 Box::pin(async {
938 Ok(GrpcResponseData {
939 payload: Bytes::new(),
940 metadata: MetadataMap::new(),
941 })
942 })
943 }
944
945 fn service_name(&self) -> &str {
946 "test.ServerStreamTestService"
947 }
948
949 fn rpc_mode(&self) -> RpcMode {
950 RpcMode::ServerStreaming
951 }
952 }
953
954 let handler = ServerStreamTestHandler;
955 assert_eq!(handler.rpc_mode(), RpcMode::ServerStreaming);
956 }
957
958 #[test]
959 fn test_rpc_mode_client_streaming() {
960 struct ClientStreamTestHandler;
961
962 impl GrpcHandler for ClientStreamTestHandler {
963 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
964 Box::pin(async {
965 Ok(GrpcResponseData {
966 payload: Bytes::new(),
967 metadata: MetadataMap::new(),
968 })
969 })
970 }
971
972 fn service_name(&self) -> &str {
973 "test.ClientStreamTestService"
974 }
975
976 fn rpc_mode(&self) -> RpcMode {
977 RpcMode::ClientStreaming
978 }
979 }
980
981 let handler = ClientStreamTestHandler;
982 assert_eq!(handler.rpc_mode(), RpcMode::ClientStreaming);
983 }
984
985 #[test]
986 fn test_rpc_mode_bidirectional_streaming() {
987 struct BiDirectionalStreamTestHandler;
988
989 impl GrpcHandler for BiDirectionalStreamTestHandler {
990 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
991 Box::pin(async {
992 Ok(GrpcResponseData {
993 payload: Bytes::new(),
994 metadata: MetadataMap::new(),
995 })
996 })
997 }
998
999 fn service_name(&self) -> &str {
1000 "test.BiDirectionalStreamTestService"
1001 }
1002
1003 fn rpc_mode(&self) -> RpcMode {
1004 RpcMode::BidirectionalStreaming
1005 }
1006 }
1007
1008 let handler = BiDirectionalStreamTestHandler;
1009 assert_eq!(handler.rpc_mode(), RpcMode::BidirectionalStreaming);
1010 }
1011
1012 #[tokio::test]
1013 async fn test_server_stream_single_message() {
1014 use futures_util::StreamExt;
1015
1016 struct SingleMessageStreamHandler;
1017
1018 impl GrpcHandler for SingleMessageStreamHandler {
1019 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
1020 Box::pin(async {
1021 Ok(GrpcResponseData {
1022 payload: Bytes::new(),
1023 metadata: MetadataMap::new(),
1024 })
1025 })
1026 }
1027
1028 fn service_name(&self) -> &str {
1029 "test.SingleMessageStreamService"
1030 }
1031
1032 fn rpc_mode(&self) -> RpcMode {
1033 RpcMode::ServerStreaming
1034 }
1035
1036 fn call_server_stream(
1037 &self,
1038 _request: GrpcRequestData,
1039 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1040 Box::pin(async {
1041 Ok(super::super::streaming::single_message_stream(Bytes::from(
1042 "single_msg",
1043 )))
1044 })
1045 }
1046 }
1047
1048 let handler = SingleMessageStreamHandler;
1049 let request = GrpcRequestData {
1050 service_name: "test.SingleMessageStreamService".to_string(),
1051 method_name: "SingleMessage".to_string(),
1052 payload: Bytes::new(),
1053 metadata: MetadataMap::new(),
1054 };
1055
1056 let result = handler.call_server_stream(request).await;
1057 assert!(result.is_ok());
1058
1059 let mut stream = result.unwrap();
1060 let msg = stream.next().await.unwrap().unwrap();
1061 assert_eq!(msg, Bytes::from("single_msg"));
1062 assert!(stream.next().await.is_none());
1063 }
1064
1065 #[tokio::test]
1066 async fn test_server_stream_preserves_request_data() {
1067 use futures_util::StreamExt;
1068
1069 struct RequestPreservingStreamHandler;
1070
1071 impl GrpcHandler for RequestPreservingStreamHandler {
1072 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
1073 Box::pin(async {
1074 Ok(GrpcResponseData {
1075 payload: Bytes::new(),
1076 metadata: MetadataMap::new(),
1077 })
1078 })
1079 }
1080
1081 fn service_name(&self) -> &str {
1082 "test.RequestPreservingService"
1083 }
1084
1085 fn rpc_mode(&self) -> RpcMode {
1086 RpcMode::ServerStreaming
1087 }
1088
1089 fn call_server_stream(
1090 &self,
1091 request: GrpcRequestData,
1092 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1093 Box::pin(async move {
1094 assert_eq!(request.service_name, "test.RequestPreservingService");
1096 assert_eq!(request.method_name, "PreserveTest");
1097 assert_eq!(request.payload, Bytes::from("test_payload"));
1098
1099 let messages = vec![Bytes::from("response")];
1100 Ok(super::super::streaming::message_stream_from_vec(messages))
1101 })
1102 }
1103 }
1104
1105 let handler = RequestPreservingStreamHandler;
1106 let request = GrpcRequestData {
1107 service_name: "test.RequestPreservingService".to_string(),
1108 method_name: "PreserveTest".to_string(),
1109 payload: Bytes::from("test_payload"),
1110 metadata: MetadataMap::new(),
1111 };
1112
1113 let result = handler.call_server_stream(request).await;
1114 assert!(result.is_ok());
1115
1116 let mut stream = result.unwrap();
1117 let msg = stream.next().await.unwrap().unwrap();
1118 assert_eq!(msg, Bytes::from("response"));
1119 }
1120
1121 #[tokio::test]
1122 async fn test_server_stream_with_various_error_codes() {
1123 struct ErrorCodeStreamHandler {
1124 error_code: tonic::Code,
1125 }
1126
1127 impl GrpcHandler for ErrorCodeStreamHandler {
1128 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
1129 Box::pin(async {
1130 Ok(GrpcResponseData {
1131 payload: Bytes::new(),
1132 metadata: MetadataMap::new(),
1133 })
1134 })
1135 }
1136
1137 fn service_name(&self) -> &str {
1138 "test.ErrorCodeService"
1139 }
1140
1141 fn rpc_mode(&self) -> RpcMode {
1142 RpcMode::ServerStreaming
1143 }
1144
1145 fn call_server_stream(
1146 &self,
1147 _request: GrpcRequestData,
1148 ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1149 let code = self.error_code;
1150 Box::pin(async move {
1151 match code {
1152 tonic::Code::InvalidArgument => Err(tonic::Status::invalid_argument("Invalid argument")),
1153 tonic::Code::FailedPrecondition => {
1154 Err(tonic::Status::failed_precondition("Failed precondition"))
1155 }
1156 tonic::Code::PermissionDenied => Err(tonic::Status::permission_denied("Permission denied")),
1157 _ => Err(tonic::Status::internal("Internal error")),
1158 }
1159 })
1160 }
1161 }
1162
1163 let handler = ErrorCodeStreamHandler {
1165 error_code: tonic::Code::InvalidArgument,
1166 };
1167 let request = GrpcRequestData {
1168 service_name: "test.ErrorCodeService".to_string(),
1169 method_name: "Error".to_string(),
1170 payload: Bytes::new(),
1171 metadata: MetadataMap::new(),
1172 };
1173 let result = handler.call_server_stream(request).await;
1174 assert!(result.is_err());
1175 if let Err(error) = result {
1176 assert_eq!(error.code(), tonic::Code::InvalidArgument);
1177 }
1178
1179 let handler = ErrorCodeStreamHandler {
1181 error_code: tonic::Code::FailedPrecondition,
1182 };
1183 let request = GrpcRequestData {
1184 service_name: "test.ErrorCodeService".to_string(),
1185 method_name: "Error".to_string(),
1186 payload: Bytes::new(),
1187 metadata: MetadataMap::new(),
1188 };
1189 let result = handler.call_server_stream(request).await;
1190 assert!(result.is_err());
1191 if let Err(error) = result {
1192 assert_eq!(error.code(), tonic::Code::FailedPrecondition);
1193 }
1194
1195 let handler = ErrorCodeStreamHandler {
1197 error_code: tonic::Code::PermissionDenied,
1198 };
1199 let request = GrpcRequestData {
1200 service_name: "test.ErrorCodeService".to_string(),
1201 method_name: "Error".to_string(),
1202 payload: Bytes::new(),
1203 metadata: MetadataMap::new(),
1204 };
1205 let result = handler.call_server_stream(request).await;
1206 assert!(result.is_err());
1207 if let Err(error) = result {
1208 assert_eq!(error.code(), tonic::Code::PermissionDenied);
1209 }
1210 }
1211}