spikard_http/grpc/
mod.rs

1//! gRPC runtime support for Spikard
2//!
3//! This module provides gRPC server infrastructure using Tonic, enabling
4//! Spikard to handle both HTTP/1.1 REST requests and HTTP/2 gRPC requests.
5//!
6//! # Architecture
7//!
8//! The gRPC support follows the same language-agnostic pattern as the HTTP handler:
9//!
10//! 1. **GrpcHandler trait**: Language-agnostic interface for handling gRPC requests
11//! 2. **Service bridge**: Converts between Tonic's types and our internal representation
12//! 3. **Streaming support**: Utilities for handling streaming RPCs
13//! 4. **Server integration**: Multiplexes HTTP/1.1 and HTTP/2 traffic
14//!
15//! # Example
16//!
17//! ```ignore
18//! use spikard_http::grpc::{GrpcHandler, GrpcRequestData, GrpcResponseData};
19//! use std::sync::Arc;
20//!
21//! // Implement GrpcHandler for your language binding
22//! struct MyGrpcHandler;
23//!
24//! impl GrpcHandler for MyGrpcHandler {
25//!     fn call(&self, request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
26//!         Box::pin(async move {
27//!             // Handle the gRPC request
28//!             Ok(GrpcResponseData {
29//!                 payload: bytes::Bytes::from("response"),
30//!                 metadata: tonic::metadata::MetadataMap::new(),
31//!             })
32//!         })
33//!     }
34//!
35//!     fn service_name(&self) -> &str {
36//!         "mypackage.MyService"
37//!     }
38//! }
39//!
40//! // Register with the server
41//! let handler = Arc::new(MyGrpcHandler);
42//! let config = GrpcConfig::default();
43//! ```
44
45pub mod framing;
46pub mod handler;
47pub mod service;
48pub mod streaming;
49
50// Re-export main types
51pub use framing::parse_grpc_client_stream;
52pub use handler::{GrpcHandler, GrpcHandlerResult, GrpcRequestData, GrpcResponseData, RpcMode};
53pub use service::{GenericGrpcService, copy_metadata, is_grpc_request, parse_grpc_path};
54pub use streaming::{MessageStream, StreamingRequest, StreamingResponse};
55
56use serde::{Deserialize, Serialize};
57use std::collections::HashMap;
58use std::sync::Arc;
59
60/// Configuration for gRPC support
61///
62/// Controls how the server handles gRPC requests, including compression,
63/// timeouts, and protocol settings.
64///
65/// # Stream Limits
66///
67/// This configuration enforces message-level size limits but delegates
68/// concurrent stream limiting to the HTTP/2 transport layer:
69///
70/// - **Message Size Limits**: The `max_message_size` field is enforced per
71///   individual message (request or response) in both unary and streaming RPCs.
72///   When a single message exceeds this limit, the request is rejected with
73///   `PAYLOAD_TOO_LARGE` (HTTP 413).
74///
75/// - **Concurrent Stream Limits**: The `max_concurrent_streams` is an advisory
76///   configuration passed to the HTTP/2 layer for connection-level stream
77///   negotiation. The HTTP/2 transport automatically enforces this limit and
78///   returns GOAWAY frames when exceeded. Applications should not rely on
79///   custom enforcement of this limit.
80///
81/// - **Stream Length Limits**: There is currently no built-in limit on the
82///   total number of messages in a stream. Handlers should implement their own
83///   message counting if needed. Future versions may add a `max_stream_response_bytes`
84///   field to limit total response size per stream.
85///
86/// # Example
87///
88/// ```ignore
89/// let mut config = GrpcConfig::default();
90/// config.max_message_size = 10 * 1024 * 1024; // 10MB per message
91/// config.max_concurrent_streams = 50; // Advised to HTTP/2 layer
92/// ```
93#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct GrpcConfig {
95    /// Enable gRPC support
96    #[serde(default = "default_true")]
97    pub enabled: bool,
98
99    /// Maximum message size in bytes (for both sending and receiving)
100    ///
101    /// This limit applies to individual messages in both unary and streaming RPCs.
102    /// When a single message exceeds this size, the request is rejected with HTTP 413
103    /// (Payload Too Large).
104    ///
105    /// Default: 4MB (4194304 bytes)
106    ///
107    /// # Note
108    /// This limit does NOT apply to the total response size in streaming RPCs.
109    /// For multi-message streams, the total response can exceed this limit as long
110    /// as each individual message stays within the limit.
111    #[serde(default = "default_max_message_size")]
112    pub max_message_size: usize,
113
114    /// Enable gzip compression for gRPC messages
115    #[serde(default = "default_true")]
116    pub enable_compression: bool,
117
118    /// Timeout for gRPC requests in seconds (None = no timeout)
119    #[serde(default)]
120    pub request_timeout: Option<u64>,
121
122    /// Maximum number of concurrent streams per connection (HTTP/2 advisory)
123    ///
124    /// This value is communicated to HTTP/2 clients as the server's flow control limit.
125    /// The HTTP/2 transport layer enforces this limit automatically via SETTINGS frames
126    /// and GOAWAY responses. Applications should NOT implement custom enforcement.
127    ///
128    /// Default: 100 streams per connection
129    ///
130    /// # Stream Limiting Strategy
131    /// - **Per Connection**: This limit applies per HTTP/2 connection, not globally
132    /// - **Transport Enforcement**: HTTP/2 handles all stream limiting; applications
133    ///   need not implement custom checks
134    /// - **Streaming Requests**: In server streaming or bidi streaming, each logical
135    ///   RPC consumes one stream slot. Message ordering within a stream follows
136    ///   HTTP/2 frame ordering.
137    ///
138    /// # Future Enhancement
139    /// A future `max_stream_response_bytes` field may be added to limit the total
140    /// response size in streaming RPCs (separate from per-message limits).
141    #[serde(default = "default_max_concurrent_streams")]
142    pub max_concurrent_streams: u32,
143
144    /// Enable HTTP/2 keepalive
145    #[serde(default = "default_true")]
146    pub enable_keepalive: bool,
147
148    /// HTTP/2 keepalive interval in seconds
149    #[serde(default = "default_keepalive_interval")]
150    pub keepalive_interval: u64,
151
152    /// HTTP/2 keepalive timeout in seconds
153    #[serde(default = "default_keepalive_timeout")]
154    pub keepalive_timeout: u64,
155    // TODO: Consider adding in future versions:
156    // pub max_stream_response_bytes: Option<usize>,  // Total bytes per streaming response
157}
158
159impl Default for GrpcConfig {
160    fn default() -> Self {
161        Self {
162            enabled: true,
163            max_message_size: default_max_message_size(),
164            enable_compression: true,
165            request_timeout: None,
166            max_concurrent_streams: default_max_concurrent_streams(),
167            enable_keepalive: true,
168            keepalive_interval: default_keepalive_interval(),
169            keepalive_timeout: default_keepalive_timeout(),
170        }
171    }
172}
173
174const fn default_true() -> bool {
175    true
176}
177
178const fn default_max_message_size() -> usize {
179    4 * 1024 * 1024 // 4MB
180}
181
182const fn default_max_concurrent_streams() -> u32 {
183    100
184}
185
186const fn default_keepalive_interval() -> u64 {
187    75 // seconds
188}
189
190const fn default_keepalive_timeout() -> u64 {
191    20 // seconds
192}
193
194/// Registry for gRPC handlers
195///
196/// Maps service names to their handlers and RPC modes. Used by the server to route
197/// incoming gRPC requests to the appropriate handler method based on RPC mode.
198///
199/// # Example
200///
201/// ```ignore
202/// use spikard_http::grpc::{GrpcRegistry, RpcMode};
203/// use std::sync::Arc;
204///
205/// let mut registry = GrpcRegistry::new();
206/// registry.register("mypackage.UserService", Arc::new(user_handler), RpcMode::Unary);
207/// registry.register("mypackage.StreamService", Arc::new(stream_handler), RpcMode::ServerStreaming);
208/// ```
209type GrpcHandlerEntry = (Arc<dyn GrpcHandler>, RpcMode);
210
211#[derive(Clone)]
212pub struct GrpcRegistry {
213    handlers: Arc<HashMap<String, GrpcHandlerEntry>>,
214}
215
216impl GrpcRegistry {
217    /// Create a new empty gRPC handler registry
218    pub fn new() -> Self {
219        Self {
220            handlers: Arc::new(HashMap::new()),
221        }
222    }
223
224    /// Register a gRPC handler for a service
225    ///
226    /// # Arguments
227    ///
228    /// * `service_name` - Fully qualified service name (e.g., "mypackage.MyService")
229    /// * `handler` - Handler implementation for this service
230    /// * `rpc_mode` - The RPC mode this handler supports (Unary, ServerStreaming, etc.)
231    pub fn register(&mut self, service_name: impl Into<String>, handler: Arc<dyn GrpcHandler>, rpc_mode: RpcMode) {
232        let handlers = Arc::make_mut(&mut self.handlers);
233        handlers.insert(service_name.into(), (handler, rpc_mode));
234    }
235
236    /// Get a handler and its RPC mode by service name
237    ///
238    /// Returns both the handler and the RPC mode it was registered with,
239    /// allowing the router to dispatch to the appropriate handler method.
240    pub fn get(&self, service_name: &str) -> Option<(Arc<dyn GrpcHandler>, RpcMode)> {
241        self.handlers.get(service_name).cloned()
242    }
243
244    /// Get all registered service names
245    pub fn service_names(&self) -> Vec<String> {
246        self.handlers.keys().cloned().collect()
247    }
248
249    /// Check if a service is registered
250    pub fn contains(&self, service_name: &str) -> bool {
251        self.handlers.contains_key(service_name)
252    }
253
254    /// Get the number of registered services
255    pub fn len(&self) -> usize {
256        self.handlers.len()
257    }
258
259    /// Check if the registry is empty
260    pub fn is_empty(&self) -> bool {
261        self.handlers.is_empty()
262    }
263}
264
265impl Default for GrpcRegistry {
266    fn default() -> Self {
267        Self::new()
268    }
269}
270
271#[cfg(test)]
272mod tests {
273    use super::*;
274    use crate::grpc::handler::{GrpcHandler, GrpcHandlerResult, GrpcRequestData};
275    use std::future::Future;
276    use std::pin::Pin;
277
278    struct TestHandler;
279
280    impl GrpcHandler for TestHandler {
281        fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
282            Box::pin(async {
283                Ok(GrpcResponseData {
284                    payload: bytes::Bytes::new(),
285                    metadata: tonic::metadata::MetadataMap::new(),
286                })
287            })
288        }
289
290        fn service_name(&self) -> &'static str {
291            // Since we can't return a reference to self.0 with 'static lifetime,
292            // we need to use a workaround. In real usage, service names should be static.
293            "test.Service"
294        }
295    }
296
297    #[test]
298    fn test_grpc_config_default() {
299        let config = GrpcConfig::default();
300        assert!(config.enabled);
301        assert_eq!(config.max_message_size, 4 * 1024 * 1024);
302        assert!(config.enable_compression);
303        assert!(config.request_timeout.is_none());
304        assert_eq!(config.max_concurrent_streams, 100);
305        assert!(config.enable_keepalive);
306        assert_eq!(config.keepalive_interval, 75);
307        assert_eq!(config.keepalive_timeout, 20);
308    }
309
310    #[test]
311    fn test_grpc_config_serialization() {
312        let config = GrpcConfig::default();
313        let json = serde_json::to_string(&config).unwrap();
314        let deserialized: GrpcConfig = serde_json::from_str(&json).unwrap();
315
316        assert_eq!(config.enabled, deserialized.enabled);
317        assert_eq!(config.max_message_size, deserialized.max_message_size);
318        assert_eq!(config.enable_compression, deserialized.enable_compression);
319    }
320
321    #[test]
322    fn test_grpc_registry_new() {
323        let registry = GrpcRegistry::new();
324        assert!(registry.is_empty());
325        assert_eq!(registry.len(), 0);
326    }
327
328    #[test]
329    fn test_grpc_registry_register() {
330        let mut registry = GrpcRegistry::new();
331        let handler = Arc::new(TestHandler);
332
333        registry.register("test.Service", handler, RpcMode::Unary);
334
335        assert!(!registry.is_empty());
336        assert_eq!(registry.len(), 1);
337        assert!(registry.contains("test.Service"));
338    }
339
340    #[test]
341    fn test_grpc_registry_get() {
342        let mut registry = GrpcRegistry::new();
343        let handler = Arc::new(TestHandler);
344
345        registry.register("test.Service", handler, RpcMode::Unary);
346
347        let retrieved = registry.get("test.Service");
348        assert!(retrieved.is_some());
349        let (handler, rpc_mode) = retrieved.unwrap();
350        assert_eq!(handler.service_name(), "test.Service");
351        assert_eq!(rpc_mode, RpcMode::Unary);
352    }
353
354    #[test]
355    fn test_grpc_registry_get_nonexistent() {
356        let registry = GrpcRegistry::new();
357        let result = registry.get("nonexistent.Service");
358        assert!(result.is_none());
359    }
360
361    #[test]
362    fn test_grpc_registry_service_names() {
363        let mut registry = GrpcRegistry::new();
364
365        registry.register("service1", Arc::new(TestHandler), RpcMode::Unary);
366        registry.register("service2", Arc::new(TestHandler), RpcMode::ServerStreaming);
367        registry.register("service3", Arc::new(TestHandler), RpcMode::Unary);
368
369        let mut names = registry.service_names();
370        names.sort();
371
372        assert_eq!(names, vec!["service1", "service2", "service3"]);
373    }
374
375    #[test]
376    fn test_grpc_registry_contains() {
377        let mut registry = GrpcRegistry::new();
378        registry.register("test.Service", Arc::new(TestHandler), RpcMode::Unary);
379
380        assert!(registry.contains("test.Service"));
381        assert!(!registry.contains("other.Service"));
382    }
383
384    #[test]
385    fn test_grpc_registry_multiple_services() {
386        let mut registry = GrpcRegistry::new();
387
388        registry.register("user.Service", Arc::new(TestHandler), RpcMode::Unary);
389        registry.register("post.Service", Arc::new(TestHandler), RpcMode::ServerStreaming);
390
391        assert_eq!(registry.len(), 2);
392        assert!(registry.contains("user.Service"));
393        assert!(registry.contains("post.Service"));
394    }
395
396    #[test]
397    fn test_grpc_registry_clone() {
398        let mut registry = GrpcRegistry::new();
399        registry.register("test.Service", Arc::new(TestHandler), RpcMode::Unary);
400
401        let cloned = registry.clone();
402
403        assert_eq!(cloned.len(), 1);
404        assert!(cloned.contains("test.Service"));
405    }
406
407    #[test]
408    fn test_grpc_registry_default() {
409        let registry = GrpcRegistry::default();
410        assert!(registry.is_empty());
411    }
412
413    #[test]
414    fn test_grpc_registry_rpc_mode_storage() {
415        let mut registry = GrpcRegistry::new();
416
417        registry.register("unary.Service", Arc::new(TestHandler), RpcMode::Unary);
418        registry.register("server_stream.Service", Arc::new(TestHandler), RpcMode::ServerStreaming);
419        registry.register("client_stream.Service", Arc::new(TestHandler), RpcMode::ClientStreaming);
420        registry.register("bidi.Service", Arc::new(TestHandler), RpcMode::BidirectionalStreaming);
421
422        let (_, mode) = registry.get("unary.Service").unwrap();
423        assert_eq!(mode, RpcMode::Unary);
424
425        let (_, mode) = registry.get("server_stream.Service").unwrap();
426        assert_eq!(mode, RpcMode::ServerStreaming);
427
428        let (_, mode) = registry.get("client_stream.Service").unwrap();
429        assert_eq!(mode, RpcMode::ClientStreaming);
430
431        let (_, mode) = registry.get("bidi.Service").unwrap();
432        assert_eq!(mode, RpcMode::BidirectionalStreaming);
433    }
434}