Skip to main content

spikard_http/grpc/
handler.rs

1//! Core GrpcHandler trait for language-agnostic gRPC request handling
2//!
3//! This module defines the handler trait that language bindings implement
4//! to handle gRPC requests. Similar to the HttpHandler pattern but designed
5//! specifically for gRPC's protobuf-based message format.
6
7use bytes::Bytes;
8use std::future::Future;
9use std::pin::Pin;
10use tonic::metadata::MetadataMap;
11
12use super::streaming::MessageStream;
13
14/// RPC mode enum for declaring handler capabilities
15///
16/// Indicates which type of RPC this handler supports. This is used at
17/// handler registration to route requests to the appropriate handler method.
18#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
19pub enum RpcMode {
20    /// Unary RPC: single request, single response
21    Unary,
22    /// Server streaming RPC: single request, stream of responses
23    ServerStreaming,
24    /// Client streaming RPC: stream of requests, single response
25    ClientStreaming,
26    /// Bidirectional streaming RPC: stream of requests, stream of responses
27    BidirectionalStreaming,
28}
29
30/// gRPC request data passed to handlers
31///
32/// Contains the parsed components of a gRPC request:
33/// - Service and method names from the request path
34/// - Serialized protobuf payload as bytes
35/// - Request metadata (headers)
36#[derive(Debug, Clone)]
37pub struct GrpcRequestData {
38    /// Fully qualified service name (e.g., "mypackage.MyService")
39    pub service_name: String,
40    /// Method name (e.g., "GetUser")
41    pub method_name: String,
42    /// Serialized protobuf message bytes
43    pub payload: Bytes,
44    /// gRPC metadata (similar to HTTP headers)
45    pub metadata: MetadataMap,
46}
47
48/// gRPC response data returned by handlers
49///
50/// Contains the serialized protobuf response and any metadata to include
51/// in the response headers.
52#[derive(Debug, Clone)]
53pub struct GrpcResponseData {
54    /// Serialized protobuf message bytes
55    pub payload: Bytes,
56    /// gRPC metadata to include in response (similar to HTTP headers)
57    pub metadata: MetadataMap,
58}
59
60/// Result type for gRPC handlers
61///
62/// Returns either:
63/// - Ok(GrpcResponseData): A successful response with payload and metadata
64/// - Err(tonic::Status): A gRPC error status with code and message
65pub type GrpcHandlerResult = Result<GrpcResponseData, tonic::Status>;
66
67/// Handler trait for gRPC requests
68///
69/// This is the language-agnostic interface that all gRPC handler implementations
70/// must satisfy. Language bindings (Python, TypeScript, Ruby, PHP) will implement
71/// this trait to bridge their runtime to Spikard's gRPC server.
72///
73/// Handlers declare their RPC mode (unary vs streaming) via the `rpc_mode()` method.
74/// The gRPC server uses this to route requests to either `call()` or `call_server_stream()`.
75///
76/// # Examples
77///
78/// ## Basic unary handler
79///
80/// ```ignore
81/// use spikard_http::grpc::{GrpcHandler, RpcMode, GrpcRequestData, GrpcResponseData, GrpcHandlerResult};
82/// use bytes::Bytes;
83/// use std::pin::Pin;
84/// use std::future::Future;
85///
86/// struct UnaryHandler;
87///
88/// impl GrpcHandler for UnaryHandler {
89///     fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
90///         Box::pin(async move {
91///             // Parse request.payload using protobuf deserialization
92///             let user_id = extract_id_from_payload(&request.payload);
93///
94///             // Process business logic
95///             let response_data = lookup_user(user_id).await?;
96///
97///             // Serialize response and return
98///             Ok(GrpcResponseData {
99///                 payload: serialize_user(&response_data),
100///                 metadata: tonic::metadata::MetadataMap::new(),
101///             })
102///         })
103///     }
104///
105///     fn service_name(&self) -> &str {
106///         "users.UserService"
107///     }
108///
109///     // Default rpc_mode() returns RpcMode::Unary
110/// }
111/// ```
112///
113/// ## Server streaming handler
114///
115/// ```ignore
116/// use spikard_http::grpc::{GrpcHandler, RpcMode, GrpcRequestData, MessageStream};
117/// use crate::grpc::streaming::message_stream_from_vec;
118/// use bytes::Bytes;
119/// use std::pin::Pin;
120/// use std::future::Future;
121///
122/// struct StreamingHandler;
123///
124/// impl GrpcHandler for StreamingHandler {
125///     fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send>> {
126///         // Unary call not used for streaming handlers, but must be implemented
127///         Box::pin(async {
128///             Err(tonic::Status::unimplemented("Use server streaming instead"))
129///         })
130///     }
131///
132///     fn service_name(&self) -> &str {
133///         "events.EventService"
134///     }
135///
136///     fn rpc_mode(&self) -> RpcMode {
137///         RpcMode::ServerStreaming
138///     }
139///
140///     fn call_server_stream(
141///         &self,
142///         request: GrpcRequestData,
143///     ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
144///         Box::pin(async move {
145///             // Parse request to extract stream criteria (e.g., user_id)
146///             let user_id = extract_id_from_payload(&request.payload);
147///
148///             // Generate messages (e.g., fetch events from database)
149///             let events = fetch_user_events(user_id).await?;
150///             let mut messages = Vec::new();
151///
152///             for event in events {
153///                 let serialized = serialize_event(&event);
154///                 messages.push(serialized);
155///             }
156///
157///             // Convert to stream and return
158///             Ok(message_stream_from_vec(messages))
159///         })
160///     }
161/// }
162/// ```
163///
164/// # Dispatch Behavior
165///
166/// The gRPC server uses `rpc_mode()` to determine which handler method to call:
167///
168/// | RpcMode | Handler Method | Use Case |
169/// |---------|---|---|
170/// | `Unary` | `call()` | Single request, single response |
171/// | `ServerStreaming` | `call_server_stream()` | Single request, multiple responses |
172/// | `ClientStreaming` | Not yet implemented | Multiple requests, single response |
173/// | `BidirectionalStreaming` | Not yet implemented | Multiple requests, multiple responses |
174///
175/// # Error Handling
176///
177/// Both `call()` and `call_server_stream()` return gRPC error status values:
178///
179/// ```ignore
180/// // Return a specific gRPC error
181/// fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
182///     Box::pin(async {
183///         let Some(id) = parse_id(&request.payload) else {
184///             return Err(tonic::Status::invalid_argument("Missing user ID"));
185///         };
186///
187///         // ... process ...
188///     })
189/// }
190/// ```
191pub trait GrpcHandler: Send + Sync {
192    /// Handle a gRPC request
193    ///
194    /// Takes the parsed request data and returns a future that resolves to either:
195    /// - Ok(GrpcResponseData): A successful response
196    /// - Err(tonic::Status): An error with appropriate gRPC status code
197    ///
198    /// # Arguments
199    ///
200    /// * `request` - The parsed gRPC request containing service/method names,
201    ///   serialized payload, and metadata
202    ///
203    /// # Returns
204    ///
205    /// A future that resolves to a GrpcHandlerResult
206    fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>>;
207
208    /// Get the fully qualified service name this handler serves
209    ///
210    /// This is used for routing requests to the appropriate handler.
211    /// Should return the fully qualified service name as defined in the .proto file.
212    ///
213    /// # Example
214    ///
215    /// For a service defined as:
216    /// ```proto
217    /// package mypackage;
218    /// service UserService { ... }
219    /// ```
220    ///
221    /// This should return "mypackage.UserService"
222    fn service_name(&self) -> &str;
223
224    /// Get the RPC mode this handler supports
225    ///
226    /// Returns the type of RPC this handler implements. Used at handler registration
227    /// to route requests to the appropriate handler method.
228    ///
229    /// Default implementation returns `RpcMode::Unary` for backward compatibility.
230    fn rpc_mode(&self) -> RpcMode {
231        RpcMode::Unary
232    }
233
234    /// Handle a server streaming RPC request
235    ///
236    /// Takes a single request and returns a stream of response messages.
237    /// Default implementation returns `UNIMPLEMENTED` status.
238    ///
239    /// # Arguments
240    ///
241    /// * `request` - The parsed gRPC request
242    ///
243    /// # Returns
244    ///
245    /// A future that resolves to either a stream of messages or an error status
246    fn call_server_stream(
247        &self,
248        _request: GrpcRequestData,
249    ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
250        Box::pin(async { Err(tonic::Status::unimplemented("Server streaming not supported")) })
251    }
252
253    /// Handle a client streaming RPC call
254    ///
255    /// Takes a stream of request messages and returns a single response message.
256    /// Default implementation returns `UNIMPLEMENTED` status.
257    fn call_client_stream(
258        &self,
259        _request: crate::grpc::streaming::StreamingRequest,
260    ) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send>> {
261        Box::pin(async { Err(tonic::Status::unimplemented("Client streaming not supported")) })
262    }
263
264    /// Handle a bidirectional streaming RPC call
265    ///
266    /// Takes a stream of request messages and returns a stream of response messages.
267    /// Default implementation returns `UNIMPLEMENTED` status.
268    fn call_bidi_stream(
269        &self,
270        _request: crate::grpc::streaming::StreamingRequest,
271    ) -> Pin<Box<dyn Future<Output = Result<crate::grpc::streaming::MessageStream, tonic::Status>> + Send>> {
272        Box::pin(async { Err(tonic::Status::unimplemented("Bidirectional streaming not supported")) })
273    }
274}
275
276#[cfg(test)]
277mod tests {
278    use super::*;
279
280    struct TestGrpcHandler;
281
282    impl GrpcHandler for TestGrpcHandler {
283        fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
284            Box::pin(async {
285                Ok(GrpcResponseData {
286                    payload: Bytes::from("test response"),
287                    metadata: MetadataMap::new(),
288                })
289            })
290        }
291
292        fn service_name(&self) -> &str {
293            "test.TestService"
294        }
295    }
296
297    #[tokio::test]
298    async fn test_grpc_handler_basic_call() {
299        let handler = TestGrpcHandler;
300        let request = GrpcRequestData {
301            service_name: "test.TestService".to_string(),
302            method_name: "TestMethod".to_string(),
303            payload: Bytes::from("test payload"),
304            metadata: MetadataMap::new(),
305        };
306
307        let result = handler.call(request).await;
308        assert!(result.is_ok());
309
310        let response = result.unwrap();
311        assert_eq!(response.payload, Bytes::from("test response"));
312    }
313
314    #[test]
315    fn test_grpc_handler_service_name() {
316        let handler = TestGrpcHandler;
317        assert_eq!(handler.service_name(), "test.TestService");
318    }
319
320    #[test]
321    fn test_grpc_handler_default_rpc_mode() {
322        let handler = TestGrpcHandler;
323        assert_eq!(handler.rpc_mode(), RpcMode::Unary);
324    }
325
326    #[tokio::test]
327    async fn test_grpc_handler_default_server_stream_unimplemented() {
328        let handler = TestGrpcHandler;
329        let request = GrpcRequestData {
330            service_name: "test.TestService".to_string(),
331            method_name: "StreamMethod".to_string(),
332            payload: Bytes::new(),
333            metadata: MetadataMap::new(),
334        };
335
336        let result = handler.call_server_stream(request).await;
337        assert!(result.is_err());
338
339        match result {
340            Err(error) => {
341                assert_eq!(error.code(), tonic::Code::Unimplemented);
342                assert_eq!(error.message(), "Server streaming not supported");
343            }
344            Ok(_) => panic!("Expected error, got Ok"),
345        }
346    }
347
348    #[test]
349    fn test_grpc_request_data_creation() {
350        let request = GrpcRequestData {
351            service_name: "mypackage.MyService".to_string(),
352            method_name: "GetUser".to_string(),
353            payload: Bytes::from("payload"),
354            metadata: MetadataMap::new(),
355        };
356
357        assert_eq!(request.service_name, "mypackage.MyService");
358        assert_eq!(request.method_name, "GetUser");
359        assert_eq!(request.payload, Bytes::from("payload"));
360    }
361
362    #[test]
363    fn test_grpc_response_data_creation() {
364        let response = GrpcResponseData {
365            payload: Bytes::from("response"),
366            metadata: MetadataMap::new(),
367        };
368
369        assert_eq!(response.payload, Bytes::from("response"));
370        assert!(response.metadata.is_empty());
371    }
372
373    #[test]
374    fn test_grpc_request_data_clone() {
375        let original = GrpcRequestData {
376            service_name: "test.Service".to_string(),
377            method_name: "Method".to_string(),
378            payload: Bytes::from("data"),
379            metadata: MetadataMap::new(),
380        };
381
382        let cloned = original.clone();
383        assert_eq!(original.service_name, cloned.service_name);
384        assert_eq!(original.method_name, cloned.method_name);
385        assert_eq!(original.payload, cloned.payload);
386    }
387
388    #[test]
389    fn test_grpc_response_data_clone() {
390        let original = GrpcResponseData {
391            payload: Bytes::from("response data"),
392            metadata: MetadataMap::new(),
393        };
394
395        let cloned = original.clone();
396        assert_eq!(original.payload, cloned.payload);
397    }
398
399    #[tokio::test]
400    async fn test_grpc_handler_error_response() {
401        struct ErrorHandler;
402
403        impl GrpcHandler for ErrorHandler {
404            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
405                Box::pin(async { Err(tonic::Status::not_found("Resource not found")) })
406            }
407
408            fn service_name(&self) -> &str {
409                "test.ErrorService"
410            }
411        }
412
413        let handler = ErrorHandler;
414        let request = GrpcRequestData {
415            service_name: "test.ErrorService".to_string(),
416            method_name: "ErrorMethod".to_string(),
417            payload: Bytes::new(),
418            metadata: MetadataMap::new(),
419        };
420
421        let result = handler.call(request).await;
422        assert!(result.is_err());
423
424        let error = result.unwrap_err();
425        assert_eq!(error.code(), tonic::Code::NotFound);
426        assert_eq!(error.message(), "Resource not found");
427    }
428
429    // ==================== Server Streaming Tests ====================
430
431    #[tokio::test]
432    async fn test_server_stream_with_multiple_messages() {
433        use futures_util::StreamExt;
434
435        struct ServerStreamHandler;
436
437        impl GrpcHandler for ServerStreamHandler {
438            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
439                Box::pin(async {
440                    Ok(GrpcResponseData {
441                        payload: Bytes::from("unary response"),
442                        metadata: MetadataMap::new(),
443                    })
444                })
445            }
446
447            fn service_name(&self) -> &str {
448                "test.StreamService"
449            }
450
451            fn rpc_mode(&self) -> RpcMode {
452                RpcMode::ServerStreaming
453            }
454
455            fn call_server_stream(
456                &self,
457                _request: GrpcRequestData,
458            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
459                Box::pin(async {
460                    let messages = vec![
461                        Bytes::from("message1"),
462                        Bytes::from("message2"),
463                        Bytes::from("message3"),
464                    ];
465                    Ok(super::super::streaming::message_stream_from_vec(messages))
466                })
467            }
468        }
469
470        let handler = ServerStreamHandler;
471        let request = GrpcRequestData {
472            service_name: "test.StreamService".to_string(),
473            method_name: "StreamMethod".to_string(),
474            payload: Bytes::from("request data"),
475            metadata: MetadataMap::new(),
476        };
477
478        let result = handler.call_server_stream(request).await;
479        assert!(result.is_ok());
480
481        let mut stream = result.unwrap();
482        let msg1 = stream.next().await.unwrap().unwrap();
483        assert_eq!(msg1, Bytes::from("message1"));
484
485        let msg2 = stream.next().await.unwrap().unwrap();
486        assert_eq!(msg2, Bytes::from("message2"));
487
488        let msg3 = stream.next().await.unwrap().unwrap();
489        assert_eq!(msg3, Bytes::from("message3"));
490
491        assert!(stream.next().await.is_none());
492    }
493
494    #[tokio::test]
495    async fn test_server_stream_empty_stream() {
496        use futures_util::StreamExt;
497
498        struct EmptyStreamHandler;
499
500        impl GrpcHandler for EmptyStreamHandler {
501            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
502                Box::pin(async {
503                    Ok(GrpcResponseData {
504                        payload: Bytes::new(),
505                        metadata: MetadataMap::new(),
506                    })
507                })
508            }
509
510            fn service_name(&self) -> &str {
511                "test.EmptyStreamService"
512            }
513
514            fn rpc_mode(&self) -> RpcMode {
515                RpcMode::ServerStreaming
516            }
517
518            fn call_server_stream(
519                &self,
520                _request: GrpcRequestData,
521            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
522                Box::pin(async { Ok(super::super::streaming::empty_message_stream()) })
523            }
524        }
525
526        let handler = EmptyStreamHandler;
527        let request = GrpcRequestData {
528            service_name: "test.EmptyStreamService".to_string(),
529            method_name: "EmptyStream".to_string(),
530            payload: Bytes::new(),
531            metadata: MetadataMap::new(),
532        };
533
534        let result = handler.call_server_stream(request).await;
535        assert!(result.is_ok());
536
537        let mut stream = result.unwrap();
538        assert!(stream.next().await.is_none());
539    }
540
541    #[tokio::test]
542    async fn test_server_stream_with_error_mid_stream() {
543        use futures_util::StreamExt;
544
545        struct ErrorMidStreamHandler;
546
547        impl GrpcHandler for ErrorMidStreamHandler {
548            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
549                Box::pin(async {
550                    Ok(GrpcResponseData {
551                        payload: Bytes::new(),
552                        metadata: MetadataMap::new(),
553                    })
554                })
555            }
556
557            fn service_name(&self) -> &str {
558                "test.ErrorMidStreamService"
559            }
560
561            fn rpc_mode(&self) -> RpcMode {
562                RpcMode::ServerStreaming
563            }
564
565            fn call_server_stream(
566                &self,
567                _request: GrpcRequestData,
568            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
569                Box::pin(async {
570                    let messages = vec![
571                        Ok(Bytes::from("message1")),
572                        Ok(Bytes::from("message2")),
573                        Err(tonic::Status::internal("Stream error")),
574                    ];
575                    let stream: MessageStream = Box::pin(futures_util::stream::iter(messages));
576                    Ok(stream)
577                })
578            }
579        }
580
581        let handler = ErrorMidStreamHandler;
582        let request = GrpcRequestData {
583            service_name: "test.ErrorMidStreamService".to_string(),
584            method_name: "ErrorStream".to_string(),
585            payload: Bytes::new(),
586            metadata: MetadataMap::new(),
587        };
588
589        let result = handler.call_server_stream(request).await;
590        assert!(result.is_ok());
591
592        let mut stream = result.unwrap();
593
594        let msg1 = stream.next().await.unwrap().unwrap();
595        assert_eq!(msg1, Bytes::from("message1"));
596
597        let msg2 = stream.next().await.unwrap().unwrap();
598        assert_eq!(msg2, Bytes::from("message2"));
599
600        let error_result = stream.next().await.unwrap();
601        assert!(error_result.is_err());
602        let error = error_result.unwrap_err();
603        assert_eq!(error.code(), tonic::Code::Internal);
604        assert_eq!(error.message(), "Stream error");
605
606        assert!(stream.next().await.is_none());
607    }
608
609    #[tokio::test]
610    async fn test_server_stream_returns_error() {
611        struct FailingStreamHandler;
612
613        impl GrpcHandler for FailingStreamHandler {
614            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
615                Box::pin(async {
616                    Ok(GrpcResponseData {
617                        payload: Bytes::new(),
618                        metadata: MetadataMap::new(),
619                    })
620                })
621            }
622
623            fn service_name(&self) -> &str {
624                "test.FailingStreamService"
625            }
626
627            fn rpc_mode(&self) -> RpcMode {
628                RpcMode::ServerStreaming
629            }
630
631            fn call_server_stream(
632                &self,
633                _request: GrpcRequestData,
634            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
635                Box::pin(async { Err(tonic::Status::unavailable("Stream unavailable")) })
636            }
637        }
638
639        let handler = FailingStreamHandler;
640        let request = GrpcRequestData {
641            service_name: "test.FailingStreamService".to_string(),
642            method_name: "FailingStream".to_string(),
643            payload: Bytes::new(),
644            metadata: MetadataMap::new(),
645        };
646
647        let result = handler.call_server_stream(request).await;
648        assert!(result.is_err());
649
650        if let Err(error) = result {
651            assert_eq!(error.code(), tonic::Code::Unavailable);
652            assert_eq!(error.message(), "Stream unavailable");
653        } else {
654            panic!("Expected error");
655        }
656    }
657
658    #[tokio::test]
659    async fn test_server_stream_with_metadata() {
660        use futures_util::StreamExt;
661
662        struct MetadataStreamHandler;
663
664        impl GrpcHandler for MetadataStreamHandler {
665            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
666                Box::pin(async {
667                    Ok(GrpcResponseData {
668                        payload: Bytes::new(),
669                        metadata: MetadataMap::new(),
670                    })
671                })
672            }
673
674            fn service_name(&self) -> &str {
675                "test.MetadataStreamService"
676            }
677
678            fn rpc_mode(&self) -> RpcMode {
679                RpcMode::ServerStreaming
680            }
681
682            fn call_server_stream(
683                &self,
684                request: GrpcRequestData,
685            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
686                Box::pin(async move {
687                    // Verify metadata is received
688                    assert!(!request.metadata.is_empty());
689                    let messages = vec![Bytes::from("metadata_message")];
690                    Ok(super::super::streaming::message_stream_from_vec(messages))
691                })
692            }
693        }
694
695        let handler = MetadataStreamHandler;
696        let mut metadata = MetadataMap::new();
697        metadata.insert(
698            "x-request-id",
699            "test-request-123"
700                .parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
701                .unwrap(),
702        );
703
704        let request = GrpcRequestData {
705            service_name: "test.MetadataStreamService".to_string(),
706            method_name: "MetadataStream".to_string(),
707            payload: Bytes::new(),
708            metadata,
709        };
710
711        let result = handler.call_server_stream(request).await;
712        assert!(result.is_ok());
713
714        let mut stream = result.unwrap();
715        let msg = stream.next().await.unwrap().unwrap();
716        assert_eq!(msg, Bytes::from("metadata_message"));
717    }
718
719    #[tokio::test]
720    async fn test_server_stream_large_stream_100_messages() {
721        use futures_util::StreamExt;
722
723        struct LargeStreamHandler;
724
725        impl GrpcHandler for LargeStreamHandler {
726            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
727                Box::pin(async {
728                    Ok(GrpcResponseData {
729                        payload: Bytes::new(),
730                        metadata: MetadataMap::new(),
731                    })
732                })
733            }
734
735            fn service_name(&self) -> &str {
736                "test.LargeStreamService"
737            }
738
739            fn rpc_mode(&self) -> RpcMode {
740                RpcMode::ServerStreaming
741            }
742
743            fn call_server_stream(
744                &self,
745                _request: GrpcRequestData,
746            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
747                Box::pin(async {
748                    let mut messages = Vec::new();
749                    for i in 0..100 {
750                        messages.push(Bytes::from(format!("message_{}", i)));
751                    }
752                    Ok(super::super::streaming::message_stream_from_vec(messages))
753                })
754            }
755        }
756
757        let handler = LargeStreamHandler;
758        let request = GrpcRequestData {
759            service_name: "test.LargeStreamService".to_string(),
760            method_name: "LargeStream".to_string(),
761            payload: Bytes::new(),
762            metadata: MetadataMap::new(),
763        };
764
765        let result = handler.call_server_stream(request).await;
766        assert!(result.is_ok());
767
768        let mut stream = result.unwrap();
769        for i in 0..100 {
770            let msg = stream.next().await.unwrap().unwrap();
771            assert_eq!(msg, Bytes::from(format!("message_{}", i)));
772        }
773
774        assert!(stream.next().await.is_none());
775    }
776
777    #[tokio::test]
778    async fn test_server_stream_large_stream_500_messages() {
779        use futures_util::StreamExt;
780
781        struct VeryLargeStreamHandler;
782
783        impl GrpcHandler for VeryLargeStreamHandler {
784            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
785                Box::pin(async {
786                    Ok(GrpcResponseData {
787                        payload: Bytes::new(),
788                        metadata: MetadataMap::new(),
789                    })
790                })
791            }
792
793            fn service_name(&self) -> &str {
794                "test.VeryLargeStreamService"
795            }
796
797            fn rpc_mode(&self) -> RpcMode {
798                RpcMode::ServerStreaming
799            }
800
801            fn call_server_stream(
802                &self,
803                _request: GrpcRequestData,
804            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
805                Box::pin(async {
806                    let mut messages = Vec::new();
807                    for i in 0..500 {
808                        messages.push(Bytes::from(format!("msg_{}", i)));
809                    }
810                    Ok(super::super::streaming::message_stream_from_vec(messages))
811                })
812            }
813        }
814
815        let handler = VeryLargeStreamHandler;
816        let request = GrpcRequestData {
817            service_name: "test.VeryLargeStreamService".to_string(),
818            method_name: "VeryLargeStream".to_string(),
819            payload: Bytes::new(),
820            metadata: MetadataMap::new(),
821        };
822
823        let result = handler.call_server_stream(request).await;
824        assert!(result.is_ok());
825
826        let mut stream = result.unwrap();
827        let mut count = 0;
828        while let Some(item) = stream.next().await {
829            let msg = item.unwrap();
830            assert_eq!(msg, Bytes::from(format!("msg_{}", count)));
831            count += 1;
832        }
833        assert_eq!(count, 500);
834    }
835
836    #[test]
837    fn test_rpc_mode_unary() {
838        let handler = TestGrpcHandler;
839        assert_eq!(handler.rpc_mode(), RpcMode::Unary);
840    }
841
842    #[test]
843    fn test_rpc_mode_server_streaming() {
844        struct ServerStreamTestHandler;
845
846        impl GrpcHandler for ServerStreamTestHandler {
847            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
848                Box::pin(async {
849                    Ok(GrpcResponseData {
850                        payload: Bytes::new(),
851                        metadata: MetadataMap::new(),
852                    })
853                })
854            }
855
856            fn service_name(&self) -> &str {
857                "test.ServerStreamTestService"
858            }
859
860            fn rpc_mode(&self) -> RpcMode {
861                RpcMode::ServerStreaming
862            }
863        }
864
865        let handler = ServerStreamTestHandler;
866        assert_eq!(handler.rpc_mode(), RpcMode::ServerStreaming);
867    }
868
869    #[test]
870    fn test_rpc_mode_client_streaming() {
871        struct ClientStreamTestHandler;
872
873        impl GrpcHandler for ClientStreamTestHandler {
874            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
875                Box::pin(async {
876                    Ok(GrpcResponseData {
877                        payload: Bytes::new(),
878                        metadata: MetadataMap::new(),
879                    })
880                })
881            }
882
883            fn service_name(&self) -> &str {
884                "test.ClientStreamTestService"
885            }
886
887            fn rpc_mode(&self) -> RpcMode {
888                RpcMode::ClientStreaming
889            }
890        }
891
892        let handler = ClientStreamTestHandler;
893        assert_eq!(handler.rpc_mode(), RpcMode::ClientStreaming);
894    }
895
896    #[test]
897    fn test_rpc_mode_bidirectional_streaming() {
898        struct BiDirectionalStreamTestHandler;
899
900        impl GrpcHandler for BiDirectionalStreamTestHandler {
901            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
902                Box::pin(async {
903                    Ok(GrpcResponseData {
904                        payload: Bytes::new(),
905                        metadata: MetadataMap::new(),
906                    })
907                })
908            }
909
910            fn service_name(&self) -> &str {
911                "test.BiDirectionalStreamTestService"
912            }
913
914            fn rpc_mode(&self) -> RpcMode {
915                RpcMode::BidirectionalStreaming
916            }
917        }
918
919        let handler = BiDirectionalStreamTestHandler;
920        assert_eq!(handler.rpc_mode(), RpcMode::BidirectionalStreaming);
921    }
922
923    #[tokio::test]
924    async fn test_server_stream_single_message() {
925        use futures_util::StreamExt;
926
927        struct SingleMessageStreamHandler;
928
929        impl GrpcHandler for SingleMessageStreamHandler {
930            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
931                Box::pin(async {
932                    Ok(GrpcResponseData {
933                        payload: Bytes::new(),
934                        metadata: MetadataMap::new(),
935                    })
936                })
937            }
938
939            fn service_name(&self) -> &str {
940                "test.SingleMessageStreamService"
941            }
942
943            fn rpc_mode(&self) -> RpcMode {
944                RpcMode::ServerStreaming
945            }
946
947            fn call_server_stream(
948                &self,
949                _request: GrpcRequestData,
950            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
951                Box::pin(async {
952                    Ok(super::super::streaming::single_message_stream(Bytes::from(
953                        "single_msg",
954                    )))
955                })
956            }
957        }
958
959        let handler = SingleMessageStreamHandler;
960        let request = GrpcRequestData {
961            service_name: "test.SingleMessageStreamService".to_string(),
962            method_name: "SingleMessage".to_string(),
963            payload: Bytes::new(),
964            metadata: MetadataMap::new(),
965        };
966
967        let result = handler.call_server_stream(request).await;
968        assert!(result.is_ok());
969
970        let mut stream = result.unwrap();
971        let msg = stream.next().await.unwrap().unwrap();
972        assert_eq!(msg, Bytes::from("single_msg"));
973        assert!(stream.next().await.is_none());
974    }
975
976    #[tokio::test]
977    async fn test_server_stream_preserves_request_data() {
978        use futures_util::StreamExt;
979
980        struct RequestPreservingStreamHandler;
981
982        impl GrpcHandler for RequestPreservingStreamHandler {
983            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
984                Box::pin(async {
985                    Ok(GrpcResponseData {
986                        payload: Bytes::new(),
987                        metadata: MetadataMap::new(),
988                    })
989                })
990            }
991
992            fn service_name(&self) -> &str {
993                "test.RequestPreservingService"
994            }
995
996            fn rpc_mode(&self) -> RpcMode {
997                RpcMode::ServerStreaming
998            }
999
1000            fn call_server_stream(
1001                &self,
1002                request: GrpcRequestData,
1003            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1004                Box::pin(async move {
1005                    // Verify request data is preserved
1006                    assert_eq!(request.service_name, "test.RequestPreservingService");
1007                    assert_eq!(request.method_name, "PreserveTest");
1008                    assert_eq!(request.payload, Bytes::from("test_payload"));
1009
1010                    let messages = vec![Bytes::from("response")];
1011                    Ok(super::super::streaming::message_stream_from_vec(messages))
1012                })
1013            }
1014        }
1015
1016        let handler = RequestPreservingStreamHandler;
1017        let request = GrpcRequestData {
1018            service_name: "test.RequestPreservingService".to_string(),
1019            method_name: "PreserveTest".to_string(),
1020            payload: Bytes::from("test_payload"),
1021            metadata: MetadataMap::new(),
1022        };
1023
1024        let result = handler.call_server_stream(request).await;
1025        assert!(result.is_ok());
1026
1027        let mut stream = result.unwrap();
1028        let msg = stream.next().await.unwrap().unwrap();
1029        assert_eq!(msg, Bytes::from("response"));
1030    }
1031
1032    #[tokio::test]
1033    async fn test_server_stream_with_various_error_codes() {
1034        struct ErrorCodeStreamHandler {
1035            error_code: tonic::Code,
1036        }
1037
1038        impl GrpcHandler for ErrorCodeStreamHandler {
1039            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
1040                Box::pin(async {
1041                    Ok(GrpcResponseData {
1042                        payload: Bytes::new(),
1043                        metadata: MetadataMap::new(),
1044                    })
1045                })
1046            }
1047
1048            fn service_name(&self) -> &str {
1049                "test.ErrorCodeService"
1050            }
1051
1052            fn rpc_mode(&self) -> RpcMode {
1053                RpcMode::ServerStreaming
1054            }
1055
1056            fn call_server_stream(
1057                &self,
1058                _request: GrpcRequestData,
1059            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1060                let code = self.error_code;
1061                Box::pin(async move {
1062                    match code {
1063                        tonic::Code::InvalidArgument => Err(tonic::Status::invalid_argument("Invalid argument")),
1064                        tonic::Code::FailedPrecondition => {
1065                            Err(tonic::Status::failed_precondition("Failed precondition"))
1066                        }
1067                        tonic::Code::PermissionDenied => Err(tonic::Status::permission_denied("Permission denied")),
1068                        _ => Err(tonic::Status::internal("Internal error")),
1069                    }
1070                })
1071            }
1072        }
1073
1074        // Test InvalidArgument
1075        let handler = ErrorCodeStreamHandler {
1076            error_code: tonic::Code::InvalidArgument,
1077        };
1078        let request = GrpcRequestData {
1079            service_name: "test.ErrorCodeService".to_string(),
1080            method_name: "Error".to_string(),
1081            payload: Bytes::new(),
1082            metadata: MetadataMap::new(),
1083        };
1084        let result = handler.call_server_stream(request).await;
1085        assert!(result.is_err());
1086        if let Err(error) = result {
1087            assert_eq!(error.code(), tonic::Code::InvalidArgument);
1088        }
1089
1090        // Test FailedPrecondition
1091        let handler = ErrorCodeStreamHandler {
1092            error_code: tonic::Code::FailedPrecondition,
1093        };
1094        let request = GrpcRequestData {
1095            service_name: "test.ErrorCodeService".to_string(),
1096            method_name: "Error".to_string(),
1097            payload: Bytes::new(),
1098            metadata: MetadataMap::new(),
1099        };
1100        let result = handler.call_server_stream(request).await;
1101        assert!(result.is_err());
1102        if let Err(error) = result {
1103            assert_eq!(error.code(), tonic::Code::FailedPrecondition);
1104        }
1105
1106        // Test PermissionDenied
1107        let handler = ErrorCodeStreamHandler {
1108            error_code: tonic::Code::PermissionDenied,
1109        };
1110        let request = GrpcRequestData {
1111            service_name: "test.ErrorCodeService".to_string(),
1112            method_name: "Error".to_string(),
1113            payload: Bytes::new(),
1114            metadata: MetadataMap::new(),
1115        };
1116        let result = handler.call_server_stream(request).await;
1117        assert!(result.is_err());
1118        if let Err(error) = result {
1119            assert_eq!(error.code(), tonic::Code::PermissionDenied);
1120        }
1121    }
1122}