1pub mod framing;
46pub mod handler;
47pub mod service;
48pub mod streaming;
49
50pub use framing::parse_grpc_client_stream;
52pub use handler::{GrpcHandler, GrpcHandlerResult, GrpcRequestData, GrpcResponseData, RpcMode};
53pub use service::{GenericGrpcService, copy_metadata, is_grpc_request, parse_grpc_path};
54pub use streaming::{MessageStream, StreamingRequest, StreamingResponse};
55
56use serde::{Deserialize, Serialize};
57use std::collections::{HashMap, HashSet};
58use std::sync::Arc;
59
60#[derive(Debug, Clone, Serialize, Deserialize)]
94pub struct GrpcConfig {
95 #[serde(default = "default_true")]
97 pub enabled: bool,
98
99 #[serde(default = "default_max_message_size")]
112 pub max_message_size: usize,
113
114 #[serde(default = "default_true")]
116 pub enable_compression: bool,
117
118 #[serde(default)]
120 pub request_timeout: Option<u64>,
121
122 #[serde(default = "default_max_concurrent_streams")]
142 pub max_concurrent_streams: u32,
143
144 #[serde(default = "default_true")]
146 pub enable_keepalive: bool,
147
148 #[serde(default = "default_keepalive_interval")]
150 pub keepalive_interval: u64,
151
152 #[serde(default = "default_keepalive_timeout")]
154 pub keepalive_timeout: u64,
155 }
158
159impl Default for GrpcConfig {
160 fn default() -> Self {
161 Self {
162 enabled: true,
163 max_message_size: default_max_message_size(),
164 enable_compression: true,
165 request_timeout: None,
166 max_concurrent_streams: default_max_concurrent_streams(),
167 enable_keepalive: true,
168 keepalive_interval: default_keepalive_interval(),
169 keepalive_timeout: default_keepalive_timeout(),
170 }
171 }
172}
173
174const fn default_true() -> bool {
175 true
176}
177
178const fn default_max_message_size() -> usize {
179 4 * 1024 * 1024 }
181
182const fn default_max_concurrent_streams() -> u32 {
183 100
184}
185
186const fn default_keepalive_interval() -> u64 {
187 75 }
189
190const fn default_keepalive_timeout() -> u64 {
191 20 }
193
194type GrpcHandlerEntry = (Arc<dyn GrpcHandler>, RpcMode);
216const WILDCARD_METHOD: &str = "*";
217
218#[derive(Clone)]
219pub struct GrpcRegistry {
220 handlers: Arc<HashMap<(String, String), GrpcHandlerEntry>>,
221}
222
223impl GrpcRegistry {
224 pub fn new() -> Self {
226 Self {
227 handlers: Arc::new(HashMap::new()),
228 }
229 }
230
231 pub fn register(
240 &mut self,
241 service_name: impl Into<String>,
242 method_name: impl Into<String>,
243 handler: Arc<dyn GrpcHandler>,
244 rpc_mode: RpcMode,
245 ) {
246 let handlers = Arc::make_mut(&mut self.handlers);
247 handlers.insert((service_name.into(), method_name.into()), (handler, rpc_mode));
248 }
249
250 pub fn register_service(
256 &mut self,
257 service_name: impl Into<String>,
258 handler: Arc<dyn GrpcHandler>,
259 rpc_mode: RpcMode,
260 ) {
261 self.register(service_name, WILDCARD_METHOD, handler, rpc_mode);
262 }
263
264 pub fn get(&self, service_name: &str, method_name: &str) -> Option<(Arc<dyn GrpcHandler>, RpcMode)> {
269 self.handlers
270 .get(&(service_name.to_owned(), method_name.to_owned()))
271 .or_else(|| {
272 self.handlers
273 .get(&(service_name.to_owned(), WILDCARD_METHOD.to_owned()))
274 })
275 .cloned()
276 }
277
278 pub fn service_names(&self) -> Vec<String> {
280 self.handlers
281 .keys()
282 .map(|(service_name, _)| service_name.clone())
283 .collect::<HashSet<_>>()
284 .into_iter()
285 .collect()
286 }
287
288 pub fn method_names(&self, service_name: &str) -> Vec<String> {
290 self.handlers
291 .keys()
292 .filter(|(registered_service, method_name)| {
293 registered_service == service_name && method_name.as_str() != WILDCARD_METHOD
294 })
295 .map(|(_, method_name)| method_name.clone())
296 .collect()
297 }
298
299 pub fn contains(&self, service_name: &str, method_name: &str) -> bool {
301 self.handlers
302 .contains_key(&(service_name.to_owned(), method_name.to_owned()))
303 }
304
305 pub fn contains_service(&self, service_name: &str) -> bool {
307 self.handlers
308 .keys()
309 .any(|(registered_service, _)| registered_service == service_name)
310 }
311
312 pub fn len(&self) -> usize {
314 self.handlers.len()
315 }
316
317 pub fn is_empty(&self) -> bool {
319 self.handlers.is_empty()
320 }
321}
322
323impl Default for GrpcRegistry {
324 fn default() -> Self {
325 Self::new()
326 }
327}
328
329#[cfg(test)]
330mod tests {
331 use super::*;
332 use crate::grpc::handler::{GrpcHandler, GrpcHandlerResult, GrpcRequestData};
333 use std::future::Future;
334 use std::pin::Pin;
335
336 struct TestHandler;
337
338 impl GrpcHandler for TestHandler {
339 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
340 Box::pin(async {
341 Ok(GrpcResponseData {
342 payload: bytes::Bytes::new(),
343 metadata: tonic::metadata::MetadataMap::new(),
344 })
345 })
346 }
347
348 fn service_name(&self) -> &'static str {
349 "test.Service"
352 }
353 }
354
355 #[test]
356 fn test_grpc_config_default() {
357 let config = GrpcConfig::default();
358 assert!(config.enabled);
359 assert_eq!(config.max_message_size, 4 * 1024 * 1024);
360 assert!(config.enable_compression);
361 assert!(config.request_timeout.is_none());
362 assert_eq!(config.max_concurrent_streams, 100);
363 assert!(config.enable_keepalive);
364 assert_eq!(config.keepalive_interval, 75);
365 assert_eq!(config.keepalive_timeout, 20);
366 }
367
368 #[test]
369 fn test_grpc_config_serialization() {
370 let config = GrpcConfig::default();
371 let json = serde_json::to_string(&config).unwrap();
372 let deserialized: GrpcConfig = serde_json::from_str(&json).unwrap();
373
374 assert_eq!(config.enabled, deserialized.enabled);
375 assert_eq!(config.max_message_size, deserialized.max_message_size);
376 assert_eq!(config.enable_compression, deserialized.enable_compression);
377 }
378
379 #[test]
380 fn test_grpc_registry_new() {
381 let registry = GrpcRegistry::new();
382 assert!(registry.is_empty());
383 assert_eq!(registry.len(), 0);
384 }
385
386 #[test]
387 fn test_grpc_registry_register() {
388 let mut registry = GrpcRegistry::new();
389 let handler = Arc::new(TestHandler);
390
391 registry.register("test.Service", "TestMethod", handler, RpcMode::Unary);
392
393 assert!(!registry.is_empty());
394 assert_eq!(registry.len(), 1);
395 assert!(registry.contains("test.Service", "TestMethod"));
396 }
397
398 #[test]
399 fn test_grpc_registry_get() {
400 let mut registry = GrpcRegistry::new();
401 let handler = Arc::new(TestHandler);
402
403 registry.register("test.Service", "TestMethod", handler, RpcMode::Unary);
404
405 let retrieved = registry.get("test.Service", "TestMethod");
406 assert!(retrieved.is_some());
407 let (handler, rpc_mode) = retrieved.unwrap();
408 assert_eq!(handler.service_name(), "test.Service");
409 assert_eq!(rpc_mode, RpcMode::Unary);
410 }
411
412 #[test]
413 fn test_grpc_registry_get_nonexistent() {
414 let registry = GrpcRegistry::new();
415 let result = registry.get("nonexistent.Service", "MissingMethod");
416 assert!(result.is_none());
417 }
418
419 #[test]
420 fn test_grpc_registry_service_names() {
421 let mut registry = GrpcRegistry::new();
422
423 registry.register("service1", "Method1", Arc::new(TestHandler), RpcMode::Unary);
424 registry.register("service2", "Method2", Arc::new(TestHandler), RpcMode::ServerStreaming);
425 registry.register("service3", "Method3", Arc::new(TestHandler), RpcMode::Unary);
426
427 let mut names = registry.service_names();
428 names.sort();
429
430 assert_eq!(names, vec!["service1", "service2", "service3"]);
431 }
432
433 #[test]
434 fn test_grpc_registry_contains() {
435 let mut registry = GrpcRegistry::new();
436 registry.register("test.Service", "TestMethod", Arc::new(TestHandler), RpcMode::Unary);
437
438 assert!(registry.contains("test.Service", "TestMethod"));
439 assert!(!registry.contains("other.Service", "TestMethod"));
440 }
441
442 #[test]
443 fn test_grpc_registry_multiple_services() {
444 let mut registry = GrpcRegistry::new();
445
446 registry.register("user.Service", "GetUser", Arc::new(TestHandler), RpcMode::Unary);
447 registry.register(
448 "post.Service",
449 "ListPosts",
450 Arc::new(TestHandler),
451 RpcMode::ServerStreaming,
452 );
453
454 assert_eq!(registry.len(), 2);
455 assert!(registry.contains("user.Service", "GetUser"));
456 assert!(registry.contains("post.Service", "ListPosts"));
457 }
458
459 #[test]
460 fn test_grpc_registry_clone() {
461 let mut registry = GrpcRegistry::new();
462 registry.register("test.Service", "TestMethod", Arc::new(TestHandler), RpcMode::Unary);
463
464 let cloned = registry.clone();
465
466 assert_eq!(cloned.len(), 1);
467 assert!(cloned.contains("test.Service", "TestMethod"));
468 }
469
470 #[test]
471 fn test_grpc_registry_default() {
472 let registry = GrpcRegistry::default();
473 assert!(registry.is_empty());
474 }
475
476 #[test]
477 fn test_grpc_registry_rpc_mode_storage() {
478 let mut registry = GrpcRegistry::new();
479
480 registry.register("unary.Service", "UnaryMethod", Arc::new(TestHandler), RpcMode::Unary);
481 registry.register(
482 "server_stream.Service",
483 "StreamMethod",
484 Arc::new(TestHandler),
485 RpcMode::ServerStreaming,
486 );
487 registry.register(
488 "client_stream.Service",
489 "UploadMethod",
490 Arc::new(TestHandler),
491 RpcMode::ClientStreaming,
492 );
493 registry.register(
494 "bidi.Service",
495 "ChatMethod",
496 Arc::new(TestHandler),
497 RpcMode::BidirectionalStreaming,
498 );
499
500 let (_, mode) = registry.get("unary.Service", "UnaryMethod").unwrap();
501 assert_eq!(mode, RpcMode::Unary);
502
503 let (_, mode) = registry.get("server_stream.Service", "StreamMethod").unwrap();
504 assert_eq!(mode, RpcMode::ServerStreaming);
505
506 let (_, mode) = registry.get("client_stream.Service", "UploadMethod").unwrap();
507 assert_eq!(mode, RpcMode::ClientStreaming);
508
509 let (_, mode) = registry.get("bidi.Service", "ChatMethod").unwrap();
510 assert_eq!(mode, RpcMode::BidirectionalStreaming);
511 }
512
513 #[test]
514 fn test_grpc_registry_service_fallback() {
515 let mut registry = GrpcRegistry::new();
516 registry.register_service("test.Service", Arc::new(TestHandler), RpcMode::Unary);
517
518 assert!(registry.contains_service("test.Service"));
519 assert!(registry.get("test.Service", "AnyMethod").is_some());
520 assert!(registry.method_names("test.Service").is_empty());
521 }
522
523 #[test]
524 fn test_grpc_registry_prefers_method_specific_handler() {
525 struct MethodSpecificHandler;
526
527 impl GrpcHandler for MethodSpecificHandler {
528 fn call(&self, _request: GrpcRequestData) -> Pin<Box<dyn Future<Output = GrpcHandlerResult> + Send>> {
529 Box::pin(async {
530 Ok(GrpcResponseData {
531 payload: bytes::Bytes::from("method-specific"),
532 metadata: tonic::metadata::MetadataMap::new(),
533 })
534 })
535 }
536
537 fn service_name(&self) -> &str {
538 "test.Service"
539 }
540 }
541
542 let mut registry = GrpcRegistry::new();
543 registry.register_service("test.Service", Arc::new(TestHandler), RpcMode::Unary);
544 registry.register(
545 "test.Service",
546 "GetThing",
547 Arc::new(MethodSpecificHandler),
548 RpcMode::ServerStreaming,
549 );
550
551 let (_, mode) = registry.get("test.Service", "GetThing").unwrap();
552 assert_eq!(mode, RpcMode::ServerStreaming);
553 let (_, fallback_mode) = registry.get("test.Service", "OtherThing").unwrap();
554 assert_eq!(fallback_mode, RpcMode::Unary);
555 }
556}