Skip to main content

spikard_http/grpc/
service.rs

1//! Tonic service bridge
2//!
3//! This module bridges Tonic's service traits with our GrpcHandler trait.
4//! It handles the conversion between Tonic's types and our internal representation,
5//! enabling language-agnostic gRPC handling.
6
7use crate::grpc::handler::{GrpcHandler, GrpcHandlerResult, GrpcRequestData, GrpcResponseData};
8use crate::grpc::streaming::MessageStream;
9use bytes::Bytes;
10use futures_util::StreamExt;
11use std::sync::Arc;
12use tonic::{Request, Response, Status};
13
14/// Generic gRPC service that routes requests to a GrpcHandler
15///
16/// This service implements Tonic's server traits and routes all requests
17/// to the provided GrpcHandler implementation. It handles serialization
18/// at the boundary between Tonic and our handler trait.
19///
20/// # Example
21///
22/// ```ignore
23/// use spikard_http::grpc::service::GenericGrpcService;
24/// use std::sync::Arc;
25///
26/// let handler = Arc::new(MyGrpcHandler);
27/// let service = GenericGrpcService::new(handler);
28/// ```
29pub struct GenericGrpcService {
30    handler: Arc<dyn GrpcHandler>,
31}
32
33impl GenericGrpcService {
34    /// Create a new generic gRPC service with the given handler
35    pub fn new(handler: Arc<dyn GrpcHandler>) -> Self {
36        Self { handler }
37    }
38
39    /// Handle a unary RPC call
40    ///
41    /// Converts the Tonic Request into our GrpcRequestData format,
42    /// calls the handler, and converts the result back to a Tonic Response.
43    ///
44    /// # Arguments
45    ///
46    /// * `service_name` - Fully qualified service name
47    /// * `method_name` - Method name
48    /// * `request` - Tonic request containing the serialized protobuf message
49    pub async fn handle_unary(
50        &self,
51        service_name: String,
52        method_name: String,
53        request: Request<Bytes>,
54    ) -> Result<Response<Bytes>, Status> {
55        // Extract metadata and payload from Tonic request
56        let (metadata, _extensions, payload) = request.into_parts();
57
58        // Create our internal request representation
59        let grpc_request = GrpcRequestData {
60            service_name,
61            method_name,
62            payload,
63            metadata,
64        };
65
66        // Call the handler
67        let result: GrpcHandlerResult = self.handler.call(grpc_request).await;
68
69        // Convert result to Tonic response
70        match result {
71            Ok(grpc_response) => {
72                let mut response = Response::new(grpc_response.payload);
73                copy_metadata(&grpc_response.metadata, response.metadata_mut());
74                Ok(response)
75            }
76            Err(status) => Err(status),
77        }
78    }
79
80    /// Handle a server streaming RPC call
81    ///
82    /// Takes a single request and returns a stream of response messages.
83    /// Converts the Tonic Request into our GrpcRequestData format, calls the
84    /// handler's call_server_stream method, and converts the MessageStream
85    /// into a Tonic streaming response body.
86    ///
87    /// # Arguments
88    ///
89    /// * `service_name` - Fully qualified service name
90    /// * `method_name` - Method name
91    /// * `request` - Tonic request containing the serialized protobuf message
92    ///
93    /// # Returns
94    ///
95    /// A Response with a streaming body containing the message stream
96    ///
97    /// # Error Propagation Limitations
98    ///
99    /// When a stream returns an error mid-stream (after messages have begun
100    /// being sent), the error may not be perfectly transmitted to the client
101    /// as a gRPC trailer. This is due to limitations in Axum's `Body::from_stream`:
102    ///
103    /// - **Pre-stream errors** (before any messages): Properly converted to
104    ///   HTTP status codes and returned to the client
105    /// - **Mid-stream errors** (after messages have begun): The error is converted
106    ///   to a generic `BoxError`, and the stream terminates. The connection is
107    ///   properly closed, but the gRPC status code metadata is lost.
108    ///
109    /// For robust error handling in streaming RPCs:
110    /// - Prefer detecting errors early (before sending messages) when possible
111    /// - Include error information in the message stream itself if critical
112    ///   (application-level error messages in the protobuf)
113    /// - For true gRPC trailer support, consider implementing a custom Axum
114    ///   body type that wraps the stream and can inject trailers on error
115    ///
116    /// See: <https://github.com/tokio-rs/axum/discussions/2043>
117    pub async fn handle_server_stream(
118        &self,
119        service_name: String,
120        method_name: String,
121        request: Request<Bytes>,
122    ) -> Result<Response<axum::body::Body>, Status> {
123        // Extract metadata and payload from Tonic request
124        let (metadata, _extensions, payload) = request.into_parts();
125
126        // Create our internal request representation
127        let grpc_request = GrpcRequestData {
128            service_name,
129            method_name,
130            payload,
131            metadata,
132        };
133
134        // Call the handler's server streaming method
135        let message_stream: MessageStream = self.handler.call_server_stream(grpc_request).await?;
136
137        // Convert MessageStream to axum Body
138        //
139        // LIMITATION: When converting tonic::Status errors from the stream,
140        // we lose the gRPC status metadata. The Status is converted to a
141        // generic Box<dyn Error>, and Axum's Body::from_stream doesn't have
142        // special handling for gRPC error semantics.
143        //
144        // Current behavior:
145        // - Stream errors are converted to BoxError
146        // - Body stream terminates on the first error
147        // - Connection is properly closed
148        // - Error metadata (status code, message) is not transmitted to client
149        //
150        // TODO: Implement custom Body wrapper that can:
151        // 1. Capture tonic::Status errors
152        // 2. Extract status code and message
153        // 3. Inject gRPC trailers (grpc-status, grpc-message) when stream ends
154        // 4. Properly signal error to client while preserving partial messages
155        //
156        // This would require implementing a custom StreamBody or similar that
157        // understands gRPC error semantics.
158        let byte_stream =
159            message_stream.map(|result| result.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>));
160
161        let body = axum::body::Body::from_stream(byte_stream);
162
163        // Create response with streaming body
164        let response = Response::new(body);
165
166        Ok(response)
167    }
168
169    /// Handle a client streaming RPC call
170    ///
171    /// Takes a request body stream of protobuf messages and returns a single response.
172    /// Parses the HTTP/2 body stream using gRPC frame parser, creates a MessageStream,
173    /// calls the handler's call_client_stream method, and converts the GrpcResponseData
174    /// back to a Tonic Response.
175    ///
176    /// # Arguments
177    ///
178    /// * `service_name` - Fully qualified service name
179    /// * `method_name` - Method name
180    /// * `request` - Axum request with streaming body containing HTTP/2 framed protobuf messages
181    /// * `max_message_size` - Maximum size per message (bytes)
182    ///
183    /// # Returns
184    ///
185    /// A Response with a single message body
186    ///
187    /// # Stream Handling
188    ///
189    /// The request body stream contains framed protobuf messages. Each frame is parsed
190    /// and validated for size:
191    /// - Messages within `max_message_size` are passed to the handler
192    /// - Messages exceeding the limit result in a ResourceExhausted error
193    /// - Invalid frames result in InvalidArgument errors
194    /// - The stream terminates when the client closes the write side
195    ///
196    /// # Frame Format
197    ///
198    /// Frames follow the gRPC HTTP/2 protocol format:
199    /// - 1 byte: compression flag (0 = uncompressed)
200    /// - 4 bytes: message size (big-endian)
201    /// - N bytes: message payload
202    ///
203    /// # Metadata and Trailers
204    ///
205    /// - Request metadata (headers) from the Tonic request is passed to the handler
206    /// - Response metadata from the handler is included in the response headers
207    /// - gRPC trailers (like grpc-status) should be handled by the caller
208    pub async fn handle_client_stream(
209        &self,
210        service_name: String,
211        method_name: String,
212        request: Request<axum::body::Body>,
213        max_message_size: usize,
214    ) -> Result<Response<Bytes>, Status> {
215        // Extract metadata and body from Tonic request
216        let (metadata, _extensions, body) = request.into_parts();
217
218        // Parse HTTP/2 body into stream of gRPC frames with size validation
219        let message_stream = crate::grpc::framing::parse_grpc_client_stream(body, max_message_size).await?;
220
221        // Create our internal streaming request representation
222        let streaming_request = crate::grpc::streaming::StreamingRequest {
223            service_name,
224            method_name,
225            message_stream,
226            metadata,
227        };
228
229        // Call the handler's client streaming method
230        let response: crate::grpc::handler::GrpcHandlerResult =
231            self.handler.call_client_stream(streaming_request).await;
232
233        // Convert result to Tonic response
234        match response {
235            Ok(grpc_response) => {
236                let mut tonic_response = Response::new(grpc_response.payload);
237                copy_metadata(&grpc_response.metadata, tonic_response.metadata_mut());
238                Ok(tonic_response)
239            }
240            Err(status) => Err(status),
241        }
242    }
243
244    /// Handle a bidirectional streaming RPC call
245    ///
246    /// Takes a request body stream and returns a stream of response messages.
247    /// Parses the HTTP/2 body stream using gRPC frame parser, creates a StreamingRequest,
248    /// calls the handler's call_bidi_stream method, and converts the MessageStream
249    /// back to an Axum streaming response body.
250    ///
251    /// # Arguments
252    ///
253    /// * `service_name` - Fully qualified service name
254    /// * `method_name` - Method name
255    /// * `request` - Axum request with streaming body containing HTTP/2 framed protobuf messages
256    /// * `max_message_size` - Maximum size per message (bytes)
257    ///
258    /// # Returns
259    ///
260    /// A Response with a streaming body containing response messages
261    ///
262    /// # Stream Handling
263    ///
264    /// - Request stream: Parsed from HTTP/2 body using frame parser
265    /// - Response stream: Converted from MessageStream to Axum Body
266    /// - Both streams are independent (full-duplex)
267    /// - Errors in either stream are propagated appropriately
268    ///
269    /// # Error Propagation
270    ///
271    /// Similar to server streaming, mid-stream errors in the response may not be
272    /// perfectly transmitted as gRPC trailers due to Axum Body::from_stream limitations.
273    /// See handle_server_stream() documentation for details.
274    pub async fn handle_bidi_stream(
275        &self,
276        service_name: String,
277        method_name: String,
278        request: Request<axum::body::Body>,
279        max_message_size: usize,
280    ) -> Result<Response<axum::body::Body>, Status> {
281        // Extract metadata and body from Tonic request
282        let (metadata, _extensions, body) = request.into_parts();
283
284        // Parse HTTP/2 body into stream of gRPC frames with size validation
285        let message_stream = crate::grpc::framing::parse_grpc_client_stream(body, max_message_size).await?;
286
287        // Create our internal streaming request representation
288        let streaming_request = crate::grpc::streaming::StreamingRequest {
289            service_name,
290            method_name,
291            message_stream,
292            metadata,
293        };
294
295        // Call the handler's bidirectional streaming method
296        let response_stream: MessageStream = self.handler.call_bidi_stream(streaming_request).await?;
297
298        // Convert MessageStream to axum Body (same as server streaming)
299        let byte_stream =
300            response_stream.map(|result| result.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>));
301
302        let body = axum::body::Body::from_stream(byte_stream);
303        let response = Response::new(body);
304
305        Ok(response)
306    }
307
308    /// Get the service name from the handler
309    pub fn service_name(&self) -> &str {
310        self.handler.service_name()
311    }
312}
313
314/// Helper function to parse gRPC path into service and method names
315///
316/// gRPC paths follow the format: `/<package>.<service>/<method>`
317///
318/// # Example
319///
320/// ```ignore
321/// use spikard_http::grpc::service::parse_grpc_path;
322///
323/// let (service, method) = parse_grpc_path("/mypackage.UserService/GetUser").unwrap();
324/// assert_eq!(service, "mypackage.UserService");
325/// assert_eq!(method, "GetUser");
326/// ```
327pub fn parse_grpc_path(path: &str) -> Result<(String, String), Status> {
328    // gRPC paths are in the format: /<package>.<service>/<method>
329    let path = path.trim_start_matches('/');
330    let parts: Vec<&str> = path.split('/').collect();
331
332    if parts.len() != 2 {
333        return Err(Status::invalid_argument(format!("Invalid gRPC path: {}", path)));
334    }
335
336    let service_name = parts[0].to_string();
337    let method_name = parts[1].to_string();
338
339    if service_name.is_empty() || method_name.is_empty() {
340        return Err(Status::invalid_argument("Service or method name is empty"));
341    }
342
343    Ok((service_name, method_name))
344}
345
346/// Check if a request is a gRPC request
347///
348/// Checks the content-type header for "application/grpc" prefix.
349///
350/// # Example
351///
352/// ```ignore
353/// use spikard_http::grpc::service::is_grpc_request;
354/// use axum::http::HeaderMap;
355///
356/// let mut headers = HeaderMap::new();
357/// headers.insert("content-type", "application/grpc".parse().unwrap());
358///
359/// assert!(is_grpc_request(&headers));
360/// ```
361pub fn is_grpc_request(headers: &axum::http::HeaderMap) -> bool {
362    headers
363        .get(axum::http::header::CONTENT_TYPE)
364        .and_then(|v| v.to_str().ok())
365        .map(|v| v.starts_with("application/grpc"))
366        .unwrap_or(false)
367}
368
369/// Copy metadata from source to destination MetadataMap
370///
371/// Efficiently copies all metadata entries (both ASCII and binary)
372/// from one MetadataMap to another without unnecessary allocations.
373///
374/// # Arguments
375///
376/// * `source` - Source metadata to copy from
377/// * `dest` - Destination metadata to copy into
378pub fn copy_metadata(source: &tonic::metadata::MetadataMap, dest: &mut tonic::metadata::MetadataMap) {
379    for key_value in source.iter() {
380        match key_value {
381            tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
382                dest.insert(key, value.clone());
383            }
384            tonic::metadata::KeyAndValueRef::Binary(key, value) => {
385                dest.insert_bin(key, value.clone());
386            }
387        }
388    }
389}
390
391/// Convert GrpcResponseData to Tonic Response
392///
393/// Helper function to convert our internal response representation
394/// to a Tonic Response.
395pub fn grpc_response_to_tonic(response: GrpcResponseData) -> Response<Bytes> {
396    let mut tonic_response = Response::new(response.payload);
397    copy_metadata(&response.metadata, tonic_response.metadata_mut());
398    tonic_response
399}
400
401#[cfg(test)]
402mod tests {
403    use super::*;
404    use crate::grpc::handler::GrpcHandler;
405    use std::future::Future;
406    use std::pin::Pin;
407    use tonic::metadata::MetadataMap;
408
409    struct TestHandler;
410
411    impl GrpcHandler for TestHandler {
412        fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
413            Box::pin(async move {
414                // Echo back the request payload
415                Ok(GrpcResponseData {
416                    payload: request.payload,
417                    metadata: MetadataMap::new(),
418                })
419            })
420        }
421
422        fn service_name(&self) -> &str {
423            "test.TestService"
424        }
425    }
426
427    #[tokio::test]
428    async fn test_generic_grpc_service_handle_unary() {
429        let handler = Arc::new(TestHandler);
430        let service = GenericGrpcService::new(handler);
431
432        let request = Request::new(Bytes::from("test payload"));
433        let result = service
434            .handle_unary("test.TestService".to_string(), "TestMethod".to_string(), request)
435            .await;
436
437        assert!(result.is_ok());
438        let response = result.unwrap();
439        assert_eq!(response.into_inner(), Bytes::from("test payload"));
440    }
441
442    #[tokio::test]
443    async fn test_generic_grpc_service_with_metadata() {
444        let handler = Arc::new(TestHandler);
445        let service = GenericGrpcService::new(handler);
446
447        let mut request = Request::new(Bytes::from("payload"));
448        request
449            .metadata_mut()
450            .insert("custom-header", "custom-value".parse().unwrap());
451
452        let result = service
453            .handle_unary("test.TestService".to_string(), "TestMethod".to_string(), request)
454            .await;
455
456        assert!(result.is_ok());
457    }
458
459    #[test]
460    fn test_parse_grpc_path_valid() {
461        let (service, method) = parse_grpc_path("/mypackage.UserService/GetUser").unwrap();
462        assert_eq!(service, "mypackage.UserService");
463        assert_eq!(method, "GetUser");
464    }
465
466    #[test]
467    fn test_parse_grpc_path_with_nested_package() {
468        let (service, method) = parse_grpc_path("/com.example.api.v1.UserService/GetUser").unwrap();
469        assert_eq!(service, "com.example.api.v1.UserService");
470        assert_eq!(method, "GetUser");
471    }
472
473    #[test]
474    fn test_parse_grpc_path_invalid_format() {
475        let result = parse_grpc_path("/invalid");
476        assert!(result.is_err());
477        let status = result.unwrap_err();
478        assert_eq!(status.code(), tonic::Code::InvalidArgument);
479    }
480
481    #[test]
482    fn test_parse_grpc_path_empty_service() {
483        let result = parse_grpc_path("//Method");
484        assert!(result.is_err());
485    }
486
487    #[test]
488    fn test_parse_grpc_path_empty_method() {
489        let result = parse_grpc_path("/Service/");
490        assert!(result.is_err());
491    }
492
493    #[test]
494    fn test_parse_grpc_path_no_leading_slash() {
495        let (service, method) = parse_grpc_path("package.Service/Method").unwrap();
496        assert_eq!(service, "package.Service");
497        assert_eq!(method, "Method");
498    }
499
500    #[test]
501    fn test_is_grpc_request_valid() {
502        let mut headers = axum::http::HeaderMap::new();
503        headers.insert(axum::http::header::CONTENT_TYPE, "application/grpc".parse().unwrap());
504        assert!(is_grpc_request(&headers));
505    }
506
507    #[test]
508    fn test_is_grpc_request_with_subtype() {
509        let mut headers = axum::http::HeaderMap::new();
510        headers.insert(
511            axum::http::header::CONTENT_TYPE,
512            "application/grpc+proto".parse().unwrap(),
513        );
514        assert!(is_grpc_request(&headers));
515    }
516
517    #[test]
518    fn test_is_grpc_request_not_grpc() {
519        let mut headers = axum::http::HeaderMap::new();
520        headers.insert(axum::http::header::CONTENT_TYPE, "application/json".parse().unwrap());
521        assert!(!is_grpc_request(&headers));
522    }
523
524    #[test]
525    fn test_is_grpc_request_no_content_type() {
526        let headers = axum::http::HeaderMap::new();
527        assert!(!is_grpc_request(&headers));
528    }
529
530    #[test]
531    fn test_grpc_response_to_tonic_basic() {
532        let response = GrpcResponseData {
533            payload: Bytes::from("response"),
534            metadata: MetadataMap::new(),
535        };
536
537        let tonic_response = grpc_response_to_tonic(response);
538        assert_eq!(tonic_response.into_inner(), Bytes::from("response"));
539    }
540
541    #[test]
542    fn test_grpc_response_to_tonic_with_metadata() {
543        let mut metadata = MetadataMap::new();
544        metadata.insert("custom-header", "value".parse().unwrap());
545
546        let response = GrpcResponseData {
547            payload: Bytes::from("data"),
548            metadata,
549        };
550
551        let tonic_response = grpc_response_to_tonic(response);
552        assert_eq!(tonic_response.get_ref(), &Bytes::from("data"));
553        assert!(tonic_response.metadata().get("custom-header").is_some());
554    }
555
556    #[test]
557    fn test_generic_grpc_service_service_name() {
558        let handler = Arc::new(TestHandler);
559        let service = GenericGrpcService::new(handler);
560        assert_eq!(service.service_name(), "test.TestService");
561    }
562
563    #[test]
564    fn test_copy_metadata() {
565        let mut source = MetadataMap::new();
566        source.insert("key1", "value1".parse().unwrap());
567        source.insert("key2", "value2".parse().unwrap());
568
569        let mut dest = MetadataMap::new();
570        copy_metadata(&source, &mut dest);
571
572        assert_eq!(dest.get("key1").unwrap(), "value1");
573        assert_eq!(dest.get("key2").unwrap(), "value2");
574    }
575
576    #[test]
577    fn test_copy_metadata_empty() {
578        let source = MetadataMap::new();
579        let mut dest = MetadataMap::new();
580        copy_metadata(&source, &mut dest);
581        assert!(dest.is_empty());
582    }
583
584    #[test]
585    fn test_copy_metadata_binary() {
586        let mut source = MetadataMap::new();
587        source.insert_bin("binary-key-bin", tonic::metadata::MetadataValue::from_bytes(b"binary"));
588
589        let mut dest = MetadataMap::new();
590        copy_metadata(&source, &mut dest);
591
592        assert!(dest.get_bin("binary-key-bin").is_some());
593    }
594
595    #[tokio::test]
596    async fn test_generic_grpc_service_error_handling() {
597        struct ErrorHandler;
598
599        impl GrpcHandler for ErrorHandler {
600            fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
601                Box::pin(async { Err(Status::not_found("Resource not found")) })
602            }
603
604            fn service_name(&self) -> &str {
605                "test.ErrorService"
606            }
607        }
608
609        let handler = Arc::new(ErrorHandler);
610        let service = GenericGrpcService::new(handler);
611
612        let request = Request::new(Bytes::new());
613        let result = service
614            .handle_unary("test.ErrorService".to_string(), "ErrorMethod".to_string(), request)
615            .await;
616
617        assert!(result.is_err());
618        let status = result.unwrap_err();
619        assert_eq!(status.code(), tonic::Code::NotFound);
620        assert_eq!(status.message(), "Resource not found");
621    }
622}