spikard_http/grpc/service.rs
1//! Tonic service bridge
2//!
3//! This module bridges Tonic's service traits with our GrpcHandler trait.
4//! It handles the conversion between Tonic's types and our internal representation,
5//! enabling language-agnostic gRPC handling.
6
7use crate::grpc::handler::{GrpcHandler, GrpcHandlerResult, GrpcRequestData, GrpcResponseData};
8use crate::grpc::streaming::MessageStream;
9use bytes::Bytes;
10use futures_util::StreamExt;
11use std::sync::Arc;
12use tonic::{Request, Response, Status};
13
14/// Generic gRPC service that routes requests to a GrpcHandler
15///
16/// This service implements Tonic's server traits and routes all requests
17/// to the provided GrpcHandler implementation. It handles serialization
18/// at the boundary between Tonic and our handler trait.
19///
20/// # Example
21///
22/// ```ignore
23/// use spikard_http::grpc::service::GenericGrpcService;
24/// use std::sync::Arc;
25///
26/// let handler = Arc::new(MyGrpcHandler);
27/// let service = GenericGrpcService::new(handler);
28/// ```
29pub struct GenericGrpcService {
30 handler: Arc<dyn GrpcHandler>,
31}
32
33impl GenericGrpcService {
34 /// Create a new generic gRPC service with the given handler
35 pub fn new(handler: Arc<dyn GrpcHandler>) -> Self {
36 Self { handler }
37 }
38
39 /// Handle a unary RPC call
40 ///
41 /// Converts the Tonic Request into our GrpcRequestData format,
42 /// calls the handler, and converts the result back to a Tonic Response.
43 ///
44 /// # Arguments
45 ///
46 /// * `service_name` - Fully qualified service name
47 /// * `method_name` - Method name
48 /// * `request` - Tonic request containing the serialized protobuf message
49 pub async fn handle_unary(
50 &self,
51 service_name: String,
52 method_name: String,
53 request: Request<Bytes>,
54 ) -> Result<Response<Bytes>, Status> {
55 // Extract metadata and payload from Tonic request
56 let (metadata, _extensions, payload) = request.into_parts();
57
58 // Create our internal request representation
59 let grpc_request = GrpcRequestData {
60 service_name,
61 method_name,
62 payload,
63 metadata,
64 };
65
66 // Call the handler
67 let result: GrpcHandlerResult = self.handler.call(grpc_request).await;
68
69 // Convert result to Tonic response
70 match result {
71 Ok(grpc_response) => {
72 let mut response = Response::new(grpc_response.payload);
73 copy_metadata(&grpc_response.metadata, response.metadata_mut());
74 Ok(response)
75 }
76 Err(status) => Err(status),
77 }
78 }
79
80 /// Handle a server streaming RPC call
81 ///
82 /// Takes a single request and returns a stream of response messages.
83 /// Converts the Tonic Request into our GrpcRequestData format, calls the
84 /// handler's call_server_stream method, and converts the MessageStream
85 /// into a Tonic streaming response body.
86 ///
87 /// # Arguments
88 ///
89 /// * `service_name` - Fully qualified service name
90 /// * `method_name` - Method name
91 /// * `request` - Tonic request containing the serialized protobuf message
92 ///
93 /// # Returns
94 ///
95 /// A Response with a streaming body containing the message stream
96 ///
97 /// # Error Propagation Limitations
98 ///
99 /// When a stream returns an error mid-stream (after messages have begun
100 /// being sent), the error may not be perfectly transmitted to the client
101 /// as a gRPC trailer. This is due to limitations in Axum's `Body::from_stream`:
102 ///
103 /// - **Pre-stream errors** (before any messages): Properly converted to
104 /// HTTP status codes and returned to the client
105 /// - **Mid-stream errors** (after messages have begun): The error is converted
106 /// to a generic `BoxError`, and the stream terminates. The connection is
107 /// properly closed, but the gRPC status code metadata is lost.
108 ///
109 /// For robust error handling in streaming RPCs:
110 /// - Prefer detecting errors early (before sending messages) when possible
111 /// - Include error information in the message stream itself if critical
112 /// (application-level error messages in the protobuf)
113 /// - For true gRPC trailer support, consider implementing a custom Axum
114 /// body type that wraps the stream and can inject trailers on error
115 ///
116 /// See: <https://github.com/tokio-rs/axum/discussions/2043>
117 pub async fn handle_server_stream(
118 &self,
119 service_name: String,
120 method_name: String,
121 request: Request<Bytes>,
122 ) -> Result<Response<axum::body::Body>, Status> {
123 // Extract metadata and payload from Tonic request
124 let (metadata, _extensions, payload) = request.into_parts();
125
126 // Create our internal request representation
127 let grpc_request = GrpcRequestData {
128 service_name,
129 method_name,
130 payload,
131 metadata,
132 };
133
134 // Call the handler's server streaming method
135 let message_stream: MessageStream = self.handler.call_server_stream(grpc_request).await?;
136
137 // Convert MessageStream to axum Body
138 //
139 // LIMITATION: When converting tonic::Status errors from the stream,
140 // we lose the gRPC status metadata. The Status is converted to a
141 // generic Box<dyn Error>, and Axum's Body::from_stream doesn't have
142 // special handling for gRPC error semantics.
143 //
144 // Current behavior:
145 // - Stream errors are converted to BoxError
146 // - Body stream terminates on the first error
147 // - Connection is properly closed
148 // - Error metadata (status code, message) is not transmitted to client
149 //
150 // TODO: Implement custom Body wrapper that can:
151 // 1. Capture tonic::Status errors
152 // 2. Extract status code and message
153 // 3. Inject gRPC trailers (grpc-status, grpc-message) when stream ends
154 // 4. Properly signal error to client while preserving partial messages
155 //
156 // This would require implementing a custom StreamBody or similar that
157 // understands gRPC error semantics.
158 let byte_stream =
159 message_stream.map(|result| result.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>));
160
161 let body = axum::body::Body::from_stream(byte_stream);
162
163 // Create response with streaming body
164 let response = Response::new(body);
165
166 Ok(response)
167 }
168
169 /// Handle a client streaming RPC call
170 ///
171 /// Takes a request body stream of protobuf messages and returns a single response.
172 /// Parses the HTTP/2 body stream using gRPC frame parser, creates a MessageStream,
173 /// calls the handler's call_client_stream method, and converts the GrpcResponseData
174 /// back to a Tonic Response.
175 ///
176 /// # Arguments
177 ///
178 /// * `service_name` - Fully qualified service name
179 /// * `method_name` - Method name
180 /// * `request` - Axum request with streaming body containing HTTP/2 framed protobuf messages
181 /// * `max_message_size` - Maximum size per message (bytes)
182 ///
183 /// # Returns
184 ///
185 /// A Response with a single message body
186 ///
187 /// # Stream Handling
188 ///
189 /// The request body stream contains framed protobuf messages. Each frame is parsed
190 /// and validated for size:
191 /// - Messages within `max_message_size` are passed to the handler
192 /// - Messages exceeding the limit result in a ResourceExhausted error
193 /// - Invalid frames result in InvalidArgument errors
194 /// - The stream terminates when the client closes the write side
195 ///
196 /// # Frame Format
197 ///
198 /// Frames follow the gRPC HTTP/2 protocol format:
199 /// - 1 byte: compression flag (0 = uncompressed)
200 /// - 4 bytes: message size (big-endian)
201 /// - N bytes: message payload
202 ///
203 /// # Metadata and Trailers
204 ///
205 /// - Request metadata (headers) from the Tonic request is passed to the handler
206 /// - Response metadata from the handler is included in the response headers
207 /// - gRPC trailers (like grpc-status) should be handled by the caller
208 pub async fn handle_client_stream(
209 &self,
210 service_name: String,
211 method_name: String,
212 request: Request<axum::body::Body>,
213 max_message_size: usize,
214 ) -> Result<Response<Bytes>, Status> {
215 // Extract metadata and body from Tonic request
216 let (metadata, _extensions, body) = request.into_parts();
217
218 // Parse HTTP/2 body into stream of gRPC frames with size validation
219 let message_stream = crate::grpc::framing::parse_grpc_client_stream(body, max_message_size).await?;
220
221 // Create our internal streaming request representation
222 let streaming_request = crate::grpc::streaming::StreamingRequest {
223 service_name,
224 method_name,
225 message_stream,
226 metadata,
227 };
228
229 // Call the handler's client streaming method
230 let response: crate::grpc::handler::GrpcHandlerResult =
231 self.handler.call_client_stream(streaming_request).await;
232
233 // Convert result to Tonic response
234 match response {
235 Ok(grpc_response) => {
236 let mut tonic_response = Response::new(grpc_response.payload);
237 copy_metadata(&grpc_response.metadata, tonic_response.metadata_mut());
238 Ok(tonic_response)
239 }
240 Err(status) => Err(status),
241 }
242 }
243
244 /// Handle a bidirectional streaming RPC call
245 ///
246 /// Takes a request body stream and returns a stream of response messages.
247 /// Parses the HTTP/2 body stream using gRPC frame parser, creates a StreamingRequest,
248 /// calls the handler's call_bidi_stream method, and converts the MessageStream
249 /// back to an Axum streaming response body.
250 ///
251 /// # Arguments
252 ///
253 /// * `service_name` - Fully qualified service name
254 /// * `method_name` - Method name
255 /// * `request` - Axum request with streaming body containing HTTP/2 framed protobuf messages
256 /// * `max_message_size` - Maximum size per message (bytes)
257 ///
258 /// # Returns
259 ///
260 /// A Response with a streaming body containing response messages
261 ///
262 /// # Stream Handling
263 ///
264 /// - Request stream: Parsed from HTTP/2 body using frame parser
265 /// - Response stream: Converted from MessageStream to Axum Body
266 /// - Both streams are independent (full-duplex)
267 /// - Errors in either stream are propagated appropriately
268 ///
269 /// # Error Propagation
270 ///
271 /// Similar to server streaming, mid-stream errors in the response may not be
272 /// perfectly transmitted as gRPC trailers due to Axum Body::from_stream limitations.
273 /// See handle_server_stream() documentation for details.
274 pub async fn handle_bidi_stream(
275 &self,
276 service_name: String,
277 method_name: String,
278 request: Request<axum::body::Body>,
279 max_message_size: usize,
280 ) -> Result<Response<axum::body::Body>, Status> {
281 // Extract metadata and body from Tonic request
282 let (metadata, _extensions, body) = request.into_parts();
283
284 // Parse HTTP/2 body into stream of gRPC frames with size validation
285 let message_stream = crate::grpc::framing::parse_grpc_client_stream(body, max_message_size).await?;
286
287 // Create our internal streaming request representation
288 let streaming_request = crate::grpc::streaming::StreamingRequest {
289 service_name,
290 method_name,
291 message_stream,
292 metadata,
293 };
294
295 // Call the handler's bidirectional streaming method
296 let response_stream: MessageStream = self.handler.call_bidi_stream(streaming_request).await?;
297
298 // Convert MessageStream to axum Body (same as server streaming)
299 let byte_stream =
300 response_stream.map(|result| result.map_err(|e| Box::new(e) as Box<dyn std::error::Error + Send + Sync>));
301
302 let body = axum::body::Body::from_stream(byte_stream);
303 let response = Response::new(body);
304
305 Ok(response)
306 }
307
308 /// Get the service name from the handler
309 pub fn service_name(&self) -> &str {
310 self.handler.service_name()
311 }
312}
313
314/// Helper function to parse gRPC path into service and method names
315///
316/// gRPC paths follow the format: `/<package>.<service>/<method>`
317///
318/// # Example
319///
320/// ```ignore
321/// use spikard_http::grpc::service::parse_grpc_path;
322///
323/// let (service, method) = parse_grpc_path("/mypackage.UserService/GetUser").unwrap();
324/// assert_eq!(service, "mypackage.UserService");
325/// assert_eq!(method, "GetUser");
326/// ```
327pub fn parse_grpc_path(path: &str) -> Result<(String, String), Status> {
328 // gRPC paths are in the format: /<package>.<service>/<method>
329 let path = path.trim_start_matches('/');
330 let parts: Vec<&str> = path.split('/').collect();
331
332 if parts.len() != 2 {
333 return Err(Status::invalid_argument(format!("Invalid gRPC path: {}", path)));
334 }
335
336 let service_name = parts[0].to_string();
337 let method_name = parts[1].to_string();
338
339 if service_name.is_empty() || method_name.is_empty() {
340 return Err(Status::invalid_argument("Service or method name is empty"));
341 }
342
343 Ok((service_name, method_name))
344}
345
346/// Check if a request is a gRPC request
347///
348/// Checks the content-type header for "application/grpc" prefix.
349///
350/// # Example
351///
352/// ```ignore
353/// use spikard_http::grpc::service::is_grpc_request;
354/// use axum::http::HeaderMap;
355///
356/// let mut headers = HeaderMap::new();
357/// headers.insert("content-type", "application/grpc".parse().unwrap());
358///
359/// assert!(is_grpc_request(&headers));
360/// ```
361pub fn is_grpc_request(headers: &axum::http::HeaderMap) -> bool {
362 headers
363 .get(axum::http::header::CONTENT_TYPE)
364 .and_then(|v| v.to_str().ok())
365 .map(|v| v.starts_with("application/grpc"))
366 .unwrap_or(false)
367}
368
369/// Copy metadata from source to destination MetadataMap
370///
371/// Efficiently copies all metadata entries (both ASCII and binary)
372/// from one MetadataMap to another without unnecessary allocations.
373///
374/// # Arguments
375///
376/// * `source` - Source metadata to copy from
377/// * `dest` - Destination metadata to copy into
378pub fn copy_metadata(source: &tonic::metadata::MetadataMap, dest: &mut tonic::metadata::MetadataMap) {
379 for key_value in source.iter() {
380 match key_value {
381 tonic::metadata::KeyAndValueRef::Ascii(key, value) => {
382 dest.insert(key, value.clone());
383 }
384 tonic::metadata::KeyAndValueRef::Binary(key, value) => {
385 dest.insert_bin(key, value.clone());
386 }
387 }
388 }
389}
390
391/// Convert GrpcResponseData to Tonic Response
392///
393/// Helper function to convert our internal response representation
394/// to a Tonic Response.
395pub fn grpc_response_to_tonic(response: GrpcResponseData) -> Response<Bytes> {
396 let mut tonic_response = Response::new(response.payload);
397 copy_metadata(&response.metadata, tonic_response.metadata_mut());
398 tonic_response
399}
400
401#[cfg(test)]
402mod tests {
403 use super::*;
404 use crate::grpc::handler::GrpcHandler;
405 use std::future::Future;
406 use std::pin::Pin;
407 use tonic::metadata::MetadataMap;
408
409 struct TestHandler;
410
411 impl GrpcHandler for TestHandler {
412 fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
413 Box::pin(async move {
414 // Echo back the request payload
415 Ok(GrpcResponseData {
416 payload: request.payload,
417 metadata: MetadataMap::new(),
418 })
419 })
420 }
421
422 fn service_name(&self) -> &str {
423 "test.TestService"
424 }
425 }
426
427 #[tokio::test]
428 async fn test_generic_grpc_service_handle_unary() {
429 let handler = Arc::new(TestHandler);
430 let service = GenericGrpcService::new(handler);
431
432 let request = Request::new(Bytes::from("test payload"));
433 let result = service
434 .handle_unary("test.TestService".to_string(), "TestMethod".to_string(), request)
435 .await;
436
437 assert!(result.is_ok());
438 let response = result.unwrap();
439 assert_eq!(response.into_inner(), Bytes::from("test payload"));
440 }
441
442 #[tokio::test]
443 async fn test_generic_grpc_service_with_metadata() {
444 let handler = Arc::new(TestHandler);
445 let service = GenericGrpcService::new(handler);
446
447 let mut request = Request::new(Bytes::from("payload"));
448 request
449 .metadata_mut()
450 .insert("custom-header", "custom-value".parse().unwrap());
451
452 let result = service
453 .handle_unary("test.TestService".to_string(), "TestMethod".to_string(), request)
454 .await;
455
456 assert!(result.is_ok());
457 }
458
459 #[test]
460 fn test_parse_grpc_path_valid() {
461 let (service, method) = parse_grpc_path("/mypackage.UserService/GetUser").unwrap();
462 assert_eq!(service, "mypackage.UserService");
463 assert_eq!(method, "GetUser");
464 }
465
466 #[test]
467 fn test_parse_grpc_path_with_nested_package() {
468 let (service, method) = parse_grpc_path("/com.example.api.v1.UserService/GetUser").unwrap();
469 assert_eq!(service, "com.example.api.v1.UserService");
470 assert_eq!(method, "GetUser");
471 }
472
473 #[test]
474 fn test_parse_grpc_path_invalid_format() {
475 let result = parse_grpc_path("/invalid");
476 assert!(result.is_err());
477 let status = result.unwrap_err();
478 assert_eq!(status.code(), tonic::Code::InvalidArgument);
479 }
480
481 #[test]
482 fn test_parse_grpc_path_empty_service() {
483 let result = parse_grpc_path("//Method");
484 assert!(result.is_err());
485 }
486
487 #[test]
488 fn test_parse_grpc_path_empty_method() {
489 let result = parse_grpc_path("/Service/");
490 assert!(result.is_err());
491 }
492
493 #[test]
494 fn test_parse_grpc_path_no_leading_slash() {
495 let (service, method) = parse_grpc_path("package.Service/Method").unwrap();
496 assert_eq!(service, "package.Service");
497 assert_eq!(method, "Method");
498 }
499
500 #[test]
501 fn test_is_grpc_request_valid() {
502 let mut headers = axum::http::HeaderMap::new();
503 headers.insert(axum::http::header::CONTENT_TYPE, "application/grpc".parse().unwrap());
504 assert!(is_grpc_request(&headers));
505 }
506
507 #[test]
508 fn test_is_grpc_request_with_subtype() {
509 let mut headers = axum::http::HeaderMap::new();
510 headers.insert(
511 axum::http::header::CONTENT_TYPE,
512 "application/grpc+proto".parse().unwrap(),
513 );
514 assert!(is_grpc_request(&headers));
515 }
516
517 #[test]
518 fn test_is_grpc_request_not_grpc() {
519 let mut headers = axum::http::HeaderMap::new();
520 headers.insert(axum::http::header::CONTENT_TYPE, "application/json".parse().unwrap());
521 assert!(!is_grpc_request(&headers));
522 }
523
524 #[test]
525 fn test_is_grpc_request_no_content_type() {
526 let headers = axum::http::HeaderMap::new();
527 assert!(!is_grpc_request(&headers));
528 }
529
530 #[test]
531 fn test_grpc_response_to_tonic_basic() {
532 let response = GrpcResponseData {
533 payload: Bytes::from("response"),
534 metadata: MetadataMap::new(),
535 };
536
537 let tonic_response = grpc_response_to_tonic(response);
538 assert_eq!(tonic_response.into_inner(), Bytes::from("response"));
539 }
540
541 #[test]
542 fn test_grpc_response_to_tonic_with_metadata() {
543 let mut metadata = MetadataMap::new();
544 metadata.insert("custom-header", "value".parse().unwrap());
545
546 let response = GrpcResponseData {
547 payload: Bytes::from("data"),
548 metadata,
549 };
550
551 let tonic_response = grpc_response_to_tonic(response);
552 assert_eq!(tonic_response.get_ref(), &Bytes::from("data"));
553 assert!(tonic_response.metadata().get("custom-header").is_some());
554 }
555
556 #[test]
557 fn test_generic_grpc_service_service_name() {
558 let handler = Arc::new(TestHandler);
559 let service = GenericGrpcService::new(handler);
560 assert_eq!(service.service_name(), "test.TestService");
561 }
562
563 #[test]
564 fn test_copy_metadata() {
565 let mut source = MetadataMap::new();
566 source.insert("key1", "value1".parse().unwrap());
567 source.insert("key2", "value2".parse().unwrap());
568
569 let mut dest = MetadataMap::new();
570 copy_metadata(&source, &mut dest);
571
572 assert_eq!(dest.get("key1").unwrap(), "value1");
573 assert_eq!(dest.get("key2").unwrap(), "value2");
574 }
575
576 #[test]
577 fn test_copy_metadata_empty() {
578 let source = MetadataMap::new();
579 let mut dest = MetadataMap::new();
580 copy_metadata(&source, &mut dest);
581 assert!(dest.is_empty());
582 }
583
584 #[test]
585 fn test_copy_metadata_binary() {
586 let mut source = MetadataMap::new();
587 source.insert_bin("binary-key-bin", tonic::metadata::MetadataValue::from_bytes(b"binary"));
588
589 let mut dest = MetadataMap::new();
590 copy_metadata(&source, &mut dest);
591
592 assert!(dest.get_bin("binary-key-bin").is_some());
593 }
594
595 #[tokio::test]
596 async fn test_generic_grpc_service_error_handling() {
597 struct ErrorHandler;
598
599 impl GrpcHandler for ErrorHandler {
600 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
601 Box::pin(async { Err(Status::not_found("Resource not found")) })
602 }
603
604 fn service_name(&self) -> &str {
605 "test.ErrorService"
606 }
607 }
608
609 let handler = Arc::new(ErrorHandler);
610 let service = GenericGrpcService::new(handler);
611
612 let request = Request::new(Bytes::new());
613 let result = service
614 .handle_unary("test.ErrorService".to_string(), "ErrorMethod".to_string(), request)
615 .await;
616
617 assert!(result.is_err());
618 let status = result.unwrap_err();
619 assert_eq!(status.code(), tonic::Code::NotFound);
620 assert_eq!(status.message(), "Resource not found");
621 }
622}