Skip to main content

spikard_http/grpc/
handler.rs

1//! Core GrpcHandler trait for language-agnostic gRPC request handling
2//!
3//! This module defines the handler trait that language bindings implement
4//! to handle gRPC requests. Similar to the HttpHandler pattern but designed
5//! specifically for gRPC's protobuf-based message format.
6
7use bytes::Bytes;
8use futures_util::StreamExt;
9use std::future::Future;
10use std::pin::Pin;
11use tonic::metadata::MetadataMap;
12
13use super::streaming::MessageStream;
14
15/// RPC mode enum for declaring handler capabilities
16///
17/// Indicates which type of RPC this handler supports. This is used at
18/// handler registration to route requests to the appropriate handler method.
19#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash)]
20pub enum RpcMode {
21    /// Unary RPC: single request, single response
22    Unary,
23    /// Server streaming RPC: single request, stream of responses
24    ServerStreaming,
25    /// Client streaming RPC: stream of requests, single response
26    ClientStreaming,
27    /// Bidirectional streaming RPC: stream of requests, stream of responses
28    BidirectionalStreaming,
29}
30
31/// gRPC request data passed to handlers
32///
33/// Contains the parsed components of a gRPC request:
34/// - Service and method names from the request path
35/// - Serialized protobuf payload as bytes
36/// - Request metadata (headers)
37#[derive(Debug, Clone)]
38pub struct GrpcRequestData {
39    /// Fully qualified service name (e.g., "mypackage.MyService")
40    pub service_name: String,
41    /// Method name (e.g., "GetUser")
42    pub method_name: String,
43    /// Serialized protobuf message bytes
44    pub payload: Bytes,
45    /// gRPC metadata (similar to HTTP headers)
46    pub metadata: MetadataMap,
47}
48
49/// gRPC response data returned by handlers
50///
51/// Contains the serialized protobuf response and any metadata to include
52/// in the response headers.
53#[derive(Debug, Clone)]
54pub struct GrpcResponseData {
55    /// Serialized protobuf message bytes
56    pub payload: Bytes,
57    /// gRPC metadata to include in response (similar to HTTP headers)
58    pub metadata: MetadataMap,
59}
60
61/// Result type for gRPC handlers
62///
63/// Returns either:
64/// - Ok(GrpcResponseData): A successful response with payload and metadata
65/// - Err(tonic::Status): A gRPC error status with code and message
66pub type GrpcHandlerResult = Result<GrpcResponseData, tonic::Status>;
67
68/// Handler trait for gRPC requests
69///
70/// This is the language-agnostic interface that all gRPC handler implementations
71/// must satisfy. Language bindings (Python, TypeScript, Ruby, PHP) will implement
72/// this trait to bridge their runtime to Spikard's gRPC server.
73///
74/// Handlers declare their RPC mode (unary vs streaming) via the `rpc_mode()` method.
75/// The gRPC server uses this to route requests to either `call()` or `call_server_stream()`.
76///
77/// # Examples
78///
79/// ## Basic unary handler
80///
81/// ```ignore
82/// use spikard_http::grpc::{GrpcHandler, RpcMode, GrpcRequestData, GrpcResponseData, GrpcHandlerResult};
83/// use bytes::Bytes;
84/// use std::pin::Pin;
85/// use std::future::Future;
86///
87/// struct UnaryHandler;
88///
89/// impl GrpcHandler for UnaryHandler {
90///     fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
91///         Box::pin(async move {
92///             // Parse request.payload using protobuf deserialization
93///             let user_id = extract_id_from_payload(&request.payload);
94///
95///             // Process business logic
96///             let response_data = lookup_user(user_id).await?;
97///
98///             // Serialize response and return
99///             Ok(GrpcResponseData {
100///                 payload: serialize_user(&response_data),
101///                 metadata: tonic::metadata::MetadataMap::new(),
102///             })
103///         })
104///     }
105///
106///     fn service_name(&self) -> &str {
107///         "users.UserService"
108///     }
109///
110///     // Default rpc_mode() returns RpcMode::Unary
111/// }
112/// ```
113///
114/// ## Server streaming handler
115///
116/// ```ignore
117/// use spikard_http::grpc::{GrpcHandler, RpcMode, GrpcRequestData, MessageStream};
118/// use crate::grpc::streaming::message_stream_from_vec;
119/// use bytes::Bytes;
120/// use std::pin::Pin;
121/// use std::future::Future;
122///
123/// struct StreamingHandler;
124///
125/// impl GrpcHandler for StreamingHandler {
126///     fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send>> {
127///         // Unary call not used for streaming handlers, but must be implemented
128///         Box::pin(async {
129///             Err(tonic::Status::unimplemented("Use server streaming instead"))
130///         })
131///     }
132///
133///     fn service_name(&self) -> &str {
134///         "events.EventService"
135///     }
136///
137///     fn rpc_mode(&self) -> RpcMode {
138///         RpcMode::ServerStreaming
139///     }
140///
141///     fn call_server_stream(
142///         &self,
143///         request: GrpcRequestData,
144///     ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
145///         Box::pin(async move {
146///             // Parse request to extract stream criteria (e.g., user_id)
147///             let user_id = extract_id_from_payload(&request.payload);
148///
149///             // Generate messages (e.g., fetch events from database)
150///             let events = fetch_user_events(user_id).await?;
151///             let mut messages = Vec::new();
152///
153///             for event in events {
154///                 let serialized = serialize_event(&event);
155///                 messages.push(serialized);
156///             }
157///
158///             // Convert to stream and return
159///             Ok(message_stream_from_vec(messages))
160///         })
161///     }
162/// }
163/// ```
164///
165/// # Dispatch Behavior
166///
167/// The gRPC server uses `rpc_mode()` to determine which handler method to call:
168///
169/// | RpcMode | Handler Method | Use Case |
170/// |---------|---|---|
171/// | `Unary` | `call()` | Single request, single response |
172/// | `ServerStreaming` | `call_server_stream()` | Single request, multiple responses |
173/// | `ClientStreaming` | `call_client_stream()` | Multiple requests, single response |
174/// | `BidirectionalStreaming` | `call_bidi_stream()` | Multiple requests, multiple responses |
175///
176/// # Error Handling
177///
178/// Both `call()` and `call_server_stream()` return gRPC error status values:
179///
180/// ```ignore
181/// // Return a specific gRPC error
182/// fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
183///     Box::pin(async {
184///         let Some(id) = parse_id(&request.payload) else {
185///             return Err(tonic::Status::invalid_argument("Missing user ID"));
186///         };
187///
188///         // ... process ...
189///     })
190/// }
191/// ```
192pub trait GrpcHandler: Send + Sync {
193    /// Handle a gRPC request
194    ///
195    /// Takes the parsed request data and returns a future that resolves to either:
196    /// - Ok(GrpcResponseData): A successful response
197    /// - Err(tonic::Status): An error with appropriate gRPC status code
198    ///
199    /// # Arguments
200    ///
201    /// * `request` - The parsed gRPC request containing service/method names,
202    ///   serialized payload, and metadata
203    ///
204    /// # Returns
205    ///
206    /// A future that resolves to a GrpcHandlerResult
207    fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send + '_>>;
208
209    /// Get the fully qualified service name this handler serves
210    ///
211    /// This is used for routing requests to the appropriate handler.
212    /// Should return the fully qualified service name as defined in the .proto file.
213    ///
214    /// # Example
215    ///
216    /// For a service defined as:
217    /// ```proto
218    /// package mypackage;
219    /// service UserService { ... }
220    /// ```
221    ///
222    /// This should return "mypackage.UserService"
223    fn service_name(&self) -> &str;
224
225    /// Get the RPC mode this handler supports
226    ///
227    /// Returns the type of RPC this handler implements. Used at handler registration
228    /// to route requests to the appropriate handler method.
229    ///
230    /// Default implementation returns `RpcMode::Unary` for backward compatibility.
231    fn rpc_mode(&self) -> RpcMode {
232        RpcMode::Unary
233    }
234
235    /// Handle a server streaming RPC request
236    ///
237    /// Takes a single request and returns a stream of response messages.
238    /// Default implementation adapts the unary `call()` response into a
239    /// single-message stream.
240    ///
241    /// # Arguments
242    ///
243    /// * `request` - The parsed gRPC request
244    ///
245    /// # Returns
246    ///
247    /// A future that resolves to either a stream of messages or an error status
248    fn call_server_stream(
249        &self,
250        request: GrpcRequestData,
251    ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send + '_>> {
252        let unary_future = self.call(request);
253        Box::pin(async move {
254            let response = unary_future.await?;
255            Ok(crate::grpc::streaming::single_message_stream(response.payload))
256        })
257    }
258
259    /// Handle a client streaming RPC call
260    ///
261    /// Takes a stream of request messages and returns a single response message.
262    /// Default implementation adapts to unary by requiring exactly one
263    /// request message in the stream.
264    fn call_client_stream(
265        &self,
266        request: crate::grpc::streaming::StreamingRequest,
267    ) -> Pin<Box<dyn Future<Output = Result<GrpcResponseData, tonic::Status>> + Send + '_>> {
268        Box::pin(async move {
269            let crate::grpc::streaming::StreamingRequest {
270                service_name,
271                method_name,
272                mut message_stream,
273                metadata,
274            } = request;
275
276            let first_message = match message_stream.next().await {
277                Some(Ok(message)) => message,
278                Some(Err(status)) => return Err(status),
279                None => {
280                    return Err(tonic::Status::invalid_argument(
281                        "Client stream is empty; unary fallback requires exactly one request message",
282                    ));
283                }
284            };
285
286            if let Some(next_message) = message_stream.next().await {
287                match next_message {
288                    Ok(_) => {
289                        return Err(tonic::Status::invalid_argument(
290                            "Unary fallback requires exactly one request message",
291                        ));
292                    }
293                    Err(status) => return Err(status),
294                }
295            }
296
297            self.call(GrpcRequestData {
298                service_name,
299                method_name,
300                payload: first_message,
301                metadata,
302            })
303            .await
304        })
305    }
306
307    /// Handle a bidirectional streaming RPC call
308    ///
309    /// Takes a stream of request messages and returns a stream of response messages.
310    /// Default implementation adapts to unary by requiring exactly one
311    /// request message and returning a single-message response stream.
312    fn call_bidi_stream(
313        &self,
314        request: crate::grpc::streaming::StreamingRequest,
315    ) -> Pin<Box<dyn Future<Output = Result<crate::grpc::streaming::MessageStream, tonic::Status>> + Send + '_>> {
316        Box::pin(async move {
317            let response = self.call_client_stream(request).await?;
318            Ok(crate::grpc::streaming::single_message_stream(response.payload))
319        })
320    }
321}
322
323#[cfg(test)]
324mod tests {
325    use super::*;
326
327    struct TestGrpcHandler;
328
329    impl GrpcHandler for TestGrpcHandler {
330        fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
331            Box::pin(async {
332                Ok(GrpcResponseData {
333                    payload: Bytes::from("test response"),
334                    metadata: MetadataMap::new(),
335                })
336            })
337        }
338
339        fn service_name(&self) -> &str {
340            "test.TestService"
341        }
342    }
343
344    #[tokio::test]
345    async fn test_grpc_handler_basic_call() {
346        let handler = TestGrpcHandler;
347        let request = GrpcRequestData {
348            service_name: "test.TestService".to_string(),
349            method_name: "TestMethod".to_string(),
350            payload: Bytes::from("test payload"),
351            metadata: MetadataMap::new(),
352        };
353
354        let result = handler.call(request).await;
355        assert!(result.is_ok());
356
357        let response = result.unwrap();
358        assert_eq!(response.payload, Bytes::from("test response"));
359    }
360
361    #[test]
362    fn test_grpc_handler_service_name() {
363        let handler = TestGrpcHandler;
364        assert_eq!(handler.service_name(), "test.TestService");
365    }
366
367    #[test]
368    fn test_grpc_handler_default_rpc_mode() {
369        let handler = TestGrpcHandler;
370        assert_eq!(handler.rpc_mode(), RpcMode::Unary);
371    }
372
373    #[tokio::test]
374    async fn test_grpc_handler_default_server_stream_falls_back_to_unary() {
375        let handler = TestGrpcHandler;
376        let request = GrpcRequestData {
377            service_name: "test.TestService".to_string(),
378            method_name: "StreamMethod".to_string(),
379            payload: Bytes::new(),
380            metadata: MetadataMap::new(),
381        };
382
383        let result = handler.call_server_stream(request).await.unwrap();
384        let collected: Vec<_> = result.collect().await;
385        assert_eq!(collected.len(), 1);
386        assert_eq!(collected[0].as_ref().unwrap(), &Bytes::from("test response"));
387    }
388
389    #[tokio::test]
390    async fn test_grpc_handler_default_client_stream_falls_back_to_unary() {
391        let handler = TestGrpcHandler;
392        let request = crate::grpc::streaming::StreamingRequest {
393            service_name: "test.TestService".to_string(),
394            method_name: "ClientStreamMethod".to_string(),
395            message_stream: crate::grpc::streaming::message_stream_from_vec(vec![Bytes::from("one")]),
396            metadata: MetadataMap::new(),
397        };
398
399        let result = handler.call_client_stream(request).await.unwrap();
400        assert_eq!(result.payload, Bytes::from("test response"));
401    }
402
403    #[tokio::test]
404    async fn test_grpc_handler_default_client_stream_requires_single_message() {
405        let handler = TestGrpcHandler;
406        let request = crate::grpc::streaming::StreamingRequest {
407            service_name: "test.TestService".to_string(),
408            method_name: "ClientStreamMethod".to_string(),
409            message_stream: crate::grpc::streaming::message_stream_from_vec(vec![
410                Bytes::from("one"),
411                Bytes::from("two"),
412            ]),
413            metadata: MetadataMap::new(),
414        };
415
416        let error = handler.call_client_stream(request).await.unwrap_err();
417        assert_eq!(error.code(), tonic::Code::InvalidArgument);
418        assert!(error.message().contains("exactly one"));
419    }
420
421    #[tokio::test]
422    async fn test_grpc_handler_default_bidi_stream_falls_back_to_unary() {
423        let handler = TestGrpcHandler;
424        let request = crate::grpc::streaming::StreamingRequest {
425            service_name: "test.TestService".to_string(),
426            method_name: "BidiMethod".to_string(),
427            message_stream: crate::grpc::streaming::message_stream_from_vec(vec![Bytes::from("ping")]),
428            metadata: MetadataMap::new(),
429        };
430
431        let stream = handler.call_bidi_stream(request).await.unwrap();
432        let collected: Vec<_> = stream.collect().await;
433        assert_eq!(collected.len(), 1);
434        assert_eq!(collected[0].as_ref().unwrap(), &Bytes::from("test response"));
435    }
436
437    #[test]
438    fn test_grpc_request_data_creation() {
439        let request = GrpcRequestData {
440            service_name: "mypackage.MyService".to_string(),
441            method_name: "GetUser".to_string(),
442            payload: Bytes::from("payload"),
443            metadata: MetadataMap::new(),
444        };
445
446        assert_eq!(request.service_name, "mypackage.MyService");
447        assert_eq!(request.method_name, "GetUser");
448        assert_eq!(request.payload, Bytes::from("payload"));
449    }
450
451    #[test]
452    fn test_grpc_response_data_creation() {
453        let response = GrpcResponseData {
454            payload: Bytes::from("response"),
455            metadata: MetadataMap::new(),
456        };
457
458        assert_eq!(response.payload, Bytes::from("response"));
459        assert!(response.metadata.is_empty());
460    }
461
462    #[test]
463    fn test_grpc_request_data_clone() {
464        let original = GrpcRequestData {
465            service_name: "test.Service".to_string(),
466            method_name: "Method".to_string(),
467            payload: Bytes::from("data"),
468            metadata: MetadataMap::new(),
469        };
470
471        let cloned = original.clone();
472        assert_eq!(original.service_name, cloned.service_name);
473        assert_eq!(original.method_name, cloned.method_name);
474        assert_eq!(original.payload, cloned.payload);
475    }
476
477    #[test]
478    fn test_grpc_response_data_clone() {
479        let original = GrpcResponseData {
480            payload: Bytes::from("response data"),
481            metadata: MetadataMap::new(),
482        };
483
484        let cloned = original.clone();
485        assert_eq!(original.payload, cloned.payload);
486    }
487
488    #[tokio::test]
489    async fn test_grpc_handler_error_response() {
490        struct ErrorHandler;
491
492        impl GrpcHandler for ErrorHandler {
493            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
494                Box::pin(async { Err(tonic::Status::not_found("Resource not found")) })
495            }
496
497            fn service_name(&self) -> &str {
498                "test.ErrorService"
499            }
500        }
501
502        let handler = ErrorHandler;
503        let request = GrpcRequestData {
504            service_name: "test.ErrorService".to_string(),
505            method_name: "ErrorMethod".to_string(),
506            payload: Bytes::new(),
507            metadata: MetadataMap::new(),
508        };
509
510        let result = handler.call(request).await;
511        assert!(result.is_err());
512
513        let error = result.unwrap_err();
514        assert_eq!(error.code(), tonic::Code::NotFound);
515        assert_eq!(error.message(), "Resource not found");
516    }
517
518    // ==================== Server Streaming Tests ====================
519
520    #[tokio::test]
521    async fn test_server_stream_with_multiple_messages() {
522        use futures_util::StreamExt;
523
524        struct ServerStreamHandler;
525
526        impl GrpcHandler for ServerStreamHandler {
527            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
528                Box::pin(async {
529                    Ok(GrpcResponseData {
530                        payload: Bytes::from("unary response"),
531                        metadata: MetadataMap::new(),
532                    })
533                })
534            }
535
536            fn service_name(&self) -> &str {
537                "test.StreamService"
538            }
539
540            fn rpc_mode(&self) -> RpcMode {
541                RpcMode::ServerStreaming
542            }
543
544            fn call_server_stream(
545                &self,
546                _request: GrpcRequestData,
547            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
548                Box::pin(async {
549                    let messages = vec![
550                        Bytes::from("message1"),
551                        Bytes::from("message2"),
552                        Bytes::from("message3"),
553                    ];
554                    Ok(super::super::streaming::message_stream_from_vec(messages))
555                })
556            }
557        }
558
559        let handler = ServerStreamHandler;
560        let request = GrpcRequestData {
561            service_name: "test.StreamService".to_string(),
562            method_name: "StreamMethod".to_string(),
563            payload: Bytes::from("request data"),
564            metadata: MetadataMap::new(),
565        };
566
567        let result = handler.call_server_stream(request).await;
568        assert!(result.is_ok());
569
570        let mut stream = result.unwrap();
571        let msg1 = stream.next().await.unwrap().unwrap();
572        assert_eq!(msg1, Bytes::from("message1"));
573
574        let msg2 = stream.next().await.unwrap().unwrap();
575        assert_eq!(msg2, Bytes::from("message2"));
576
577        let msg3 = stream.next().await.unwrap().unwrap();
578        assert_eq!(msg3, Bytes::from("message3"));
579
580        assert!(stream.next().await.is_none());
581    }
582
583    #[tokio::test]
584    async fn test_server_stream_empty_stream() {
585        use futures_util::StreamExt;
586
587        struct EmptyStreamHandler;
588
589        impl GrpcHandler for EmptyStreamHandler {
590            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
591                Box::pin(async {
592                    Ok(GrpcResponseData {
593                        payload: Bytes::new(),
594                        metadata: MetadataMap::new(),
595                    })
596                })
597            }
598
599            fn service_name(&self) -> &str {
600                "test.EmptyStreamService"
601            }
602
603            fn rpc_mode(&self) -> RpcMode {
604                RpcMode::ServerStreaming
605            }
606
607            fn call_server_stream(
608                &self,
609                _request: GrpcRequestData,
610            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
611                Box::pin(async { Ok(super::super::streaming::empty_message_stream()) })
612            }
613        }
614
615        let handler = EmptyStreamHandler;
616        let request = GrpcRequestData {
617            service_name: "test.EmptyStreamService".to_string(),
618            method_name: "EmptyStream".to_string(),
619            payload: Bytes::new(),
620            metadata: MetadataMap::new(),
621        };
622
623        let result = handler.call_server_stream(request).await;
624        assert!(result.is_ok());
625
626        let mut stream = result.unwrap();
627        assert!(stream.next().await.is_none());
628    }
629
630    #[tokio::test]
631    async fn test_server_stream_with_error_mid_stream() {
632        use futures_util::StreamExt;
633
634        struct ErrorMidStreamHandler;
635
636        impl GrpcHandler for ErrorMidStreamHandler {
637            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
638                Box::pin(async {
639                    Ok(GrpcResponseData {
640                        payload: Bytes::new(),
641                        metadata: MetadataMap::new(),
642                    })
643                })
644            }
645
646            fn service_name(&self) -> &str {
647                "test.ErrorMidStreamService"
648            }
649
650            fn rpc_mode(&self) -> RpcMode {
651                RpcMode::ServerStreaming
652            }
653
654            fn call_server_stream(
655                &self,
656                _request: GrpcRequestData,
657            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
658                Box::pin(async {
659                    let messages = vec![
660                        Ok(Bytes::from("message1")),
661                        Ok(Bytes::from("message2")),
662                        Err(tonic::Status::internal("Stream error")),
663                    ];
664                    let stream: MessageStream = Box::pin(futures_util::stream::iter(messages));
665                    Ok(stream)
666                })
667            }
668        }
669
670        let handler = ErrorMidStreamHandler;
671        let request = GrpcRequestData {
672            service_name: "test.ErrorMidStreamService".to_string(),
673            method_name: "ErrorStream".to_string(),
674            payload: Bytes::new(),
675            metadata: MetadataMap::new(),
676        };
677
678        let result = handler.call_server_stream(request).await;
679        assert!(result.is_ok());
680
681        let mut stream = result.unwrap();
682
683        let msg1 = stream.next().await.unwrap().unwrap();
684        assert_eq!(msg1, Bytes::from("message1"));
685
686        let msg2 = stream.next().await.unwrap().unwrap();
687        assert_eq!(msg2, Bytes::from("message2"));
688
689        let error_result = stream.next().await.unwrap();
690        assert!(error_result.is_err());
691        let error = error_result.unwrap_err();
692        assert_eq!(error.code(), tonic::Code::Internal);
693        assert_eq!(error.message(), "Stream error");
694
695        assert!(stream.next().await.is_none());
696    }
697
698    #[tokio::test]
699    async fn test_server_stream_returns_error() {
700        struct FailingStreamHandler;
701
702        impl GrpcHandler for FailingStreamHandler {
703            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
704                Box::pin(async {
705                    Ok(GrpcResponseData {
706                        payload: Bytes::new(),
707                        metadata: MetadataMap::new(),
708                    })
709                })
710            }
711
712            fn service_name(&self) -> &str {
713                "test.FailingStreamService"
714            }
715
716            fn rpc_mode(&self) -> RpcMode {
717                RpcMode::ServerStreaming
718            }
719
720            fn call_server_stream(
721                &self,
722                _request: GrpcRequestData,
723            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
724                Box::pin(async { Err(tonic::Status::unavailable("Stream unavailable")) })
725            }
726        }
727
728        let handler = FailingStreamHandler;
729        let request = GrpcRequestData {
730            service_name: "test.FailingStreamService".to_string(),
731            method_name: "FailingStream".to_string(),
732            payload: Bytes::new(),
733            metadata: MetadataMap::new(),
734        };
735
736        let result = handler.call_server_stream(request).await;
737        assert!(result.is_err());
738
739        if let Err(error) = result {
740            assert_eq!(error.code(), tonic::Code::Unavailable);
741            assert_eq!(error.message(), "Stream unavailable");
742        } else {
743            panic!("Expected error");
744        }
745    }
746
747    #[tokio::test]
748    async fn test_server_stream_with_metadata() {
749        use futures_util::StreamExt;
750
751        struct MetadataStreamHandler;
752
753        impl GrpcHandler for MetadataStreamHandler {
754            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
755                Box::pin(async {
756                    Ok(GrpcResponseData {
757                        payload: Bytes::new(),
758                        metadata: MetadataMap::new(),
759                    })
760                })
761            }
762
763            fn service_name(&self) -> &str {
764                "test.MetadataStreamService"
765            }
766
767            fn rpc_mode(&self) -> RpcMode {
768                RpcMode::ServerStreaming
769            }
770
771            fn call_server_stream(
772                &self,
773                request: GrpcRequestData,
774            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
775                Box::pin(async move {
776                    // Verify metadata is received
777                    assert!(!request.metadata.is_empty());
778                    let messages = vec![Bytes::from("metadata_message")];
779                    Ok(super::super::streaming::message_stream_from_vec(messages))
780                })
781            }
782        }
783
784        let handler = MetadataStreamHandler;
785        let mut metadata = MetadataMap::new();
786        metadata.insert(
787            "x-request-id",
788            "test-request-123"
789                .parse::<tonic::metadata::MetadataValue<tonic::metadata::Ascii>>()
790                .unwrap(),
791        );
792
793        let request = GrpcRequestData {
794            service_name: "test.MetadataStreamService".to_string(),
795            method_name: "MetadataStream".to_string(),
796            payload: Bytes::new(),
797            metadata,
798        };
799
800        let result = handler.call_server_stream(request).await;
801        assert!(result.is_ok());
802
803        let mut stream = result.unwrap();
804        let msg = stream.next().await.unwrap().unwrap();
805        assert_eq!(msg, Bytes::from("metadata_message"));
806    }
807
808    #[tokio::test]
809    async fn test_server_stream_large_stream_100_messages() {
810        use futures_util::StreamExt;
811
812        struct LargeStreamHandler;
813
814        impl GrpcHandler for LargeStreamHandler {
815            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
816                Box::pin(async {
817                    Ok(GrpcResponseData {
818                        payload: Bytes::new(),
819                        metadata: MetadataMap::new(),
820                    })
821                })
822            }
823
824            fn service_name(&self) -> &str {
825                "test.LargeStreamService"
826            }
827
828            fn rpc_mode(&self) -> RpcMode {
829                RpcMode::ServerStreaming
830            }
831
832            fn call_server_stream(
833                &self,
834                _request: GrpcRequestData,
835            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
836                Box::pin(async {
837                    let mut messages = Vec::new();
838                    for i in 0..100 {
839                        messages.push(Bytes::from(format!("message_{}", i)));
840                    }
841                    Ok(super::super::streaming::message_stream_from_vec(messages))
842                })
843            }
844        }
845
846        let handler = LargeStreamHandler;
847        let request = GrpcRequestData {
848            service_name: "test.LargeStreamService".to_string(),
849            method_name: "LargeStream".to_string(),
850            payload: Bytes::new(),
851            metadata: MetadataMap::new(),
852        };
853
854        let result = handler.call_server_stream(request).await;
855        assert!(result.is_ok());
856
857        let mut stream = result.unwrap();
858        for i in 0..100 {
859            let msg = stream.next().await.unwrap().unwrap();
860            assert_eq!(msg, Bytes::from(format!("message_{}", i)));
861        }
862
863        assert!(stream.next().await.is_none());
864    }
865
866    #[tokio::test]
867    async fn test_server_stream_large_stream_500_messages() {
868        use futures_util::StreamExt;
869
870        struct VeryLargeStreamHandler;
871
872        impl GrpcHandler for VeryLargeStreamHandler {
873            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
874                Box::pin(async {
875                    Ok(GrpcResponseData {
876                        payload: Bytes::new(),
877                        metadata: MetadataMap::new(),
878                    })
879                })
880            }
881
882            fn service_name(&self) -> &str {
883                "test.VeryLargeStreamService"
884            }
885
886            fn rpc_mode(&self) -> RpcMode {
887                RpcMode::ServerStreaming
888            }
889
890            fn call_server_stream(
891                &self,
892                _request: GrpcRequestData,
893            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
894                Box::pin(async {
895                    let mut messages = Vec::new();
896                    for i in 0..500 {
897                        messages.push(Bytes::from(format!("msg_{}", i)));
898                    }
899                    Ok(super::super::streaming::message_stream_from_vec(messages))
900                })
901            }
902        }
903
904        let handler = VeryLargeStreamHandler;
905        let request = GrpcRequestData {
906            service_name: "test.VeryLargeStreamService".to_string(),
907            method_name: "VeryLargeStream".to_string(),
908            payload: Bytes::new(),
909            metadata: MetadataMap::new(),
910        };
911
912        let result = handler.call_server_stream(request).await;
913        assert!(result.is_ok());
914
915        let mut stream = result.unwrap();
916        let mut count = 0;
917        while let Some(item) = stream.next().await {
918            let msg = item.unwrap();
919            assert_eq!(msg, Bytes::from(format!("msg_{}", count)));
920            count += 1;
921        }
922        assert_eq!(count, 500);
923    }
924
925    #[test]
926    fn test_rpc_mode_unary() {
927        let handler = TestGrpcHandler;
928        assert_eq!(handler.rpc_mode(), RpcMode::Unary);
929    }
930
931    #[test]
932    fn test_rpc_mode_server_streaming() {
933        struct ServerStreamTestHandler;
934
935        impl GrpcHandler for ServerStreamTestHandler {
936            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
937                Box::pin(async {
938                    Ok(GrpcResponseData {
939                        payload: Bytes::new(),
940                        metadata: MetadataMap::new(),
941                    })
942                })
943            }
944
945            fn service_name(&self) -> &str {
946                "test.ServerStreamTestService"
947            }
948
949            fn rpc_mode(&self) -> RpcMode {
950                RpcMode::ServerStreaming
951            }
952        }
953
954        let handler = ServerStreamTestHandler;
955        assert_eq!(handler.rpc_mode(), RpcMode::ServerStreaming);
956    }
957
958    #[test]
959    fn test_rpc_mode_client_streaming() {
960        struct ClientStreamTestHandler;
961
962        impl GrpcHandler for ClientStreamTestHandler {
963            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
964                Box::pin(async {
965                    Ok(GrpcResponseData {
966                        payload: Bytes::new(),
967                        metadata: MetadataMap::new(),
968                    })
969                })
970            }
971
972            fn service_name(&self) -> &str {
973                "test.ClientStreamTestService"
974            }
975
976            fn rpc_mode(&self) -> RpcMode {
977                RpcMode::ClientStreaming
978            }
979        }
980
981        let handler = ClientStreamTestHandler;
982        assert_eq!(handler.rpc_mode(), RpcMode::ClientStreaming);
983    }
984
985    #[test]
986    fn test_rpc_mode_bidirectional_streaming() {
987        struct BiDirectionalStreamTestHandler;
988
989        impl GrpcHandler for BiDirectionalStreamTestHandler {
990            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
991                Box::pin(async {
992                    Ok(GrpcResponseData {
993                        payload: Bytes::new(),
994                        metadata: MetadataMap::new(),
995                    })
996                })
997            }
998
999            fn service_name(&self) -> &str {
1000                "test.BiDirectionalStreamTestService"
1001            }
1002
1003            fn rpc_mode(&self) -> RpcMode {
1004                RpcMode::BidirectionalStreaming
1005            }
1006        }
1007
1008        let handler = BiDirectionalStreamTestHandler;
1009        assert_eq!(handler.rpc_mode(), RpcMode::BidirectionalStreaming);
1010    }
1011
1012    #[tokio::test]
1013    async fn test_server_stream_single_message() {
1014        use futures_util::StreamExt;
1015
1016        struct SingleMessageStreamHandler;
1017
1018        impl GrpcHandler for SingleMessageStreamHandler {
1019            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
1020                Box::pin(async {
1021                    Ok(GrpcResponseData {
1022                        payload: Bytes::new(),
1023                        metadata: MetadataMap::new(),
1024                    })
1025                })
1026            }
1027
1028            fn service_name(&self) -> &str {
1029                "test.SingleMessageStreamService"
1030            }
1031
1032            fn rpc_mode(&self) -> RpcMode {
1033                RpcMode::ServerStreaming
1034            }
1035
1036            fn call_server_stream(
1037                &self,
1038                _request: GrpcRequestData,
1039            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1040                Box::pin(async {
1041                    Ok(super::super::streaming::single_message_stream(Bytes::from(
1042                        "single_msg",
1043                    )))
1044                })
1045            }
1046        }
1047
1048        let handler = SingleMessageStreamHandler;
1049        let request = GrpcRequestData {
1050            service_name: "test.SingleMessageStreamService".to_string(),
1051            method_name: "SingleMessage".to_string(),
1052            payload: Bytes::new(),
1053            metadata: MetadataMap::new(),
1054        };
1055
1056        let result = handler.call_server_stream(request).await;
1057        assert!(result.is_ok());
1058
1059        let mut stream = result.unwrap();
1060        let msg = stream.next().await.unwrap().unwrap();
1061        assert_eq!(msg, Bytes::from("single_msg"));
1062        assert!(stream.next().await.is_none());
1063    }
1064
1065    #[tokio::test]
1066    async fn test_server_stream_preserves_request_data() {
1067        use futures_util::StreamExt;
1068
1069        struct RequestPreservingStreamHandler;
1070
1071        impl GrpcHandler for RequestPreservingStreamHandler {
1072            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
1073                Box::pin(async {
1074                    Ok(GrpcResponseData {
1075                        payload: Bytes::new(),
1076                        metadata: MetadataMap::new(),
1077                    })
1078                })
1079            }
1080
1081            fn service_name(&self) -> &str {
1082                "test.RequestPreservingService"
1083            }
1084
1085            fn rpc_mode(&self) -> RpcMode {
1086                RpcMode::ServerStreaming
1087            }
1088
1089            fn call_server_stream(
1090                &self,
1091                request: GrpcRequestData,
1092            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1093                Box::pin(async move {
1094                    // Verify request data is preserved
1095                    assert_eq!(request.service_name, "test.RequestPreservingService");
1096                    assert_eq!(request.method_name, "PreserveTest");
1097                    assert_eq!(request.payload, Bytes::from("test_payload"));
1098
1099                    let messages = vec![Bytes::from("response")];
1100                    Ok(super::super::streaming::message_stream_from_vec(messages))
1101                })
1102            }
1103        }
1104
1105        let handler = RequestPreservingStreamHandler;
1106        let request = GrpcRequestData {
1107            service_name: "test.RequestPreservingService".to_string(),
1108            method_name: "PreserveTest".to_string(),
1109            payload: Bytes::from("test_payload"),
1110            metadata: MetadataMap::new(),
1111        };
1112
1113        let result = handler.call_server_stream(request).await;
1114        assert!(result.is_ok());
1115
1116        let mut stream = result.unwrap();
1117        let msg = stream.next().await.unwrap().unwrap();
1118        assert_eq!(msg, Bytes::from("response"));
1119    }
1120
1121    #[tokio::test]
1122    async fn test_server_stream_with_various_error_codes() {
1123        struct ErrorCodeStreamHandler {
1124            error_code: tonic::Code,
1125        }
1126
1127        impl GrpcHandler for ErrorCodeStreamHandler {
1128            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
1129                Box::pin(async {
1130                    Ok(GrpcResponseData {
1131                        payload: Bytes::new(),
1132                        metadata: MetadataMap::new(),
1133                    })
1134                })
1135            }
1136
1137            fn service_name(&self) -> &str {
1138                "test.ErrorCodeService"
1139            }
1140
1141            fn rpc_mode(&self) -> RpcMode {
1142                RpcMode::ServerStreaming
1143            }
1144
1145            fn call_server_stream(
1146                &self,
1147                _request: GrpcRequestData,
1148            ) -> Pin<Box<dyn Future<Output = Result<MessageStream, tonic::Status>> + Send>> {
1149                let code = self.error_code;
1150                Box::pin(async move {
1151                    match code {
1152                        tonic::Code::InvalidArgument => Err(tonic::Status::invalid_argument("Invalid argument")),
1153                        tonic::Code::FailedPrecondition => {
1154                            Err(tonic::Status::failed_precondition("Failed precondition"))
1155                        }
1156                        tonic::Code::PermissionDenied => Err(tonic::Status::permission_denied("Permission denied")),
1157                        _ => Err(tonic::Status::internal("Internal error")),
1158                    }
1159                })
1160            }
1161        }
1162
1163        // Test InvalidArgument
1164        let handler = ErrorCodeStreamHandler {
1165            error_code: tonic::Code::InvalidArgument,
1166        };
1167        let request = GrpcRequestData {
1168            service_name: "test.ErrorCodeService".to_string(),
1169            method_name: "Error".to_string(),
1170            payload: Bytes::new(),
1171            metadata: MetadataMap::new(),
1172        };
1173        let result = handler.call_server_stream(request).await;
1174        assert!(result.is_err());
1175        if let Err(error) = result {
1176            assert_eq!(error.code(), tonic::Code::InvalidArgument);
1177        }
1178
1179        // Test FailedPrecondition
1180        let handler = ErrorCodeStreamHandler {
1181            error_code: tonic::Code::FailedPrecondition,
1182        };
1183        let request = GrpcRequestData {
1184            service_name: "test.ErrorCodeService".to_string(),
1185            method_name: "Error".to_string(),
1186            payload: Bytes::new(),
1187            metadata: MetadataMap::new(),
1188        };
1189        let result = handler.call_server_stream(request).await;
1190        assert!(result.is_err());
1191        if let Err(error) = result {
1192            assert_eq!(error.code(), tonic::Code::FailedPrecondition);
1193        }
1194
1195        // Test PermissionDenied
1196        let handler = ErrorCodeStreamHandler {
1197            error_code: tonic::Code::PermissionDenied,
1198        };
1199        let request = GrpcRequestData {
1200            service_name: "test.ErrorCodeService".to_string(),
1201            method_name: "Error".to_string(),
1202            payload: Bytes::new(),
1203            metadata: MetadataMap::new(),
1204        };
1205        let result = handler.call_server_stream(request).await;
1206        assert!(result.is_err());
1207        if let Err(error) = result {
1208            assert_eq!(error.code(), tonic::Code::PermissionDenied);
1209        }
1210    }
1211}