allframe_core/router/
grpc.rs

1//! gRPC protocol adapter
2//!
3//! Provides gRPC support for the protocol-agnostic router.
4
5use std::{future::Future, pin::Pin};
6
7use super::ProtocolAdapter;
8
9/// gRPC method type (streaming mode)
10#[derive(Debug, Clone, PartialEq)]
11pub enum GrpcMethodType {
12    /// Unary RPC: single request, single response
13    Unary,
14    /// Client streaming: stream of requests, single response
15    ClientStreaming,
16    /// Server streaming: single request, stream of responses
17    ServerStreaming,
18    /// Bidirectional streaming: stream of requests and responses
19    BidirectionalStreaming,
20}
21
22/// gRPC service method definition
23#[derive(Debug, Clone)]
24pub struct GrpcMethod {
25    /// Service name (e.g., "UserService")
26    pub service: String,
27    /// Method name (e.g., "GetUser")
28    pub method: String,
29    /// Method type (streaming mode)
30    pub method_type: GrpcMethodType,
31    /// Handler name to call
32    pub handler: String,
33}
34
35impl GrpcMethod {
36    /// Create a new gRPC method
37    pub fn new(
38        service: impl Into<String>,
39        method: impl Into<String>,
40        method_type: GrpcMethodType,
41        handler: impl Into<String>,
42    ) -> Self {
43        Self {
44            service: service.into(),
45            method: method.into(),
46            method_type,
47            handler: handler.into(),
48        }
49    }
50
51    /// Get the fully qualified method name
52    ///
53    /// Format: "ServiceName.MethodName" (gRPC convention)
54    pub fn full_name(&self) -> String {
55        format!("{}.{}", self.service, self.method)
56    }
57}
58
59/// gRPC adapter for gRPC services and RPCs
60///
61/// Handles gRPC protocol-specific request/response transformation.
62pub struct GrpcAdapter {
63    methods: Vec<GrpcMethod>,
64}
65
66impl GrpcAdapter {
67    /// Create a new gRPC adapter
68    pub fn new() -> Self {
69        Self {
70            methods: Vec::new(),
71        }
72    }
73
74    /// Register a unary RPC method
75    pub fn unary(&mut self, service: &str, method: &str, handler: &str) -> &mut Self {
76        self.methods.push(GrpcMethod::new(
77            service,
78            method,
79            GrpcMethodType::Unary,
80            handler,
81        ));
82        self
83    }
84
85    /// Register a client streaming RPC method
86    pub fn client_streaming(&mut self, service: &str, method: &str, handler: &str) -> &mut Self {
87        self.methods.push(GrpcMethod::new(
88            service,
89            method,
90            GrpcMethodType::ClientStreaming,
91            handler,
92        ));
93        self
94    }
95
96    /// Register a server streaming RPC method
97    pub fn server_streaming(&mut self, service: &str, method: &str, handler: &str) -> &mut Self {
98        self.methods.push(GrpcMethod::new(
99            service,
100            method,
101            GrpcMethodType::ServerStreaming,
102            handler,
103        ));
104        self
105    }
106
107    /// Register a bidirectional streaming RPC method
108    pub fn bidirectional_streaming(
109        &mut self,
110        service: &str,
111        method: &str,
112        handler: &str,
113    ) -> &mut Self {
114        self.methods.push(GrpcMethod::new(
115            service,
116            method,
117            GrpcMethodType::BidirectionalStreaming,
118            handler,
119        ));
120        self
121    }
122
123    /// Find a matching method by fully qualified name
124    ///
125    /// Format: "ServiceName.MethodName"
126    pub fn match_method(&self, full_name: &str) -> Option<&GrpcMethod> {
127        self.methods.iter().find(|m| m.full_name() == full_name)
128    }
129
130    /// Parse a gRPC request string
131    ///
132    /// Format: "ServiceName.MethodName:payload"
133    /// Example: "UserService.GetUser:{\"id\":42}"
134    pub fn parse_request(&self, request: &str) -> Result<(String, String), String> {
135        if request.is_empty() {
136            return Err("Empty gRPC request".to_string());
137        }
138
139        if let Some((method, payload)) = request.split_once(':') {
140            Ok((method.to_string(), payload.to_string()))
141        } else {
142            Err("Invalid gRPC request format. Expected: ServiceName.MethodName:payload".to_string())
143        }
144    }
145
146    /// Generate .proto file for registered methods
147    ///
148    /// Generates Protocol Buffer definition from registered methods.
149    pub fn generate_proto(&self) -> String {
150        if self.methods.is_empty() {
151            return String::new();
152        }
153
154        let mut proto = String::from("syntax = \"proto3\";\n\n");
155        proto.push_str("package allframe;\n\n");
156
157        // Group methods by service
158        let mut services: std::collections::HashMap<String, Vec<&GrpcMethod>> =
159            std::collections::HashMap::new();
160        for method in &self.methods {
161            services
162                .entry(method.service.clone())
163                .or_default()
164                .push(method);
165        }
166
167        // Generate service definitions
168        for (service_name, methods) in services {
169            proto.push_str(&format!("service {} {{\n", service_name));
170            for method in methods {
171                let method_proto = match method.method_type {
172                    GrpcMethodType::Unary => {
173                        format!(
174                            "  rpc {}({}Request) returns ({}Response);\n",
175                            method.method, method.method, method.method
176                        )
177                    }
178                    GrpcMethodType::ClientStreaming => {
179                        format!(
180                            "  rpc {}(stream {}Request) returns ({}Response);\n",
181                            method.method, method.method, method.method
182                        )
183                    }
184                    GrpcMethodType::ServerStreaming => {
185                        format!(
186                            "  rpc {}({}Request) returns (stream {}Response);\n",
187                            method.method, method.method, method.method
188                        )
189                    }
190                    GrpcMethodType::BidirectionalStreaming => {
191                        format!(
192                            "  rpc {}(stream {}Request) returns (stream {}Response);\n",
193                            method.method, method.method, method.method
194                        )
195                    }
196                };
197                proto.push_str(&method_proto);
198            }
199            proto.push_str("}\n");
200        }
201
202        proto.trim().to_string()
203    }
204
205    /// Build a gRPC request
206    pub fn build_request(&self, method: &str, payload: &str) -> GrpcRequest {
207        GrpcRequest {
208            method: method.to_string(),
209            payload: payload.to_string(),
210        }
211    }
212
213    /// Format a gRPC response
214    pub fn format_response(&self, status: GrpcStatus, message: &str) -> String {
215        format!("grpc-status: {} {}", status as u32, message)
216    }
217}
218
219impl Default for GrpcAdapter {
220    fn default() -> Self {
221        Self::new()
222    }
223}
224
225impl ProtocolAdapter for GrpcAdapter {
226    fn name(&self) -> &str {
227        "grpc"
228    }
229
230    fn handle(
231        &self,
232        request: &str,
233    ) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
234        // Parse request before async block
235        let parse_result = self.parse_request(request);
236        let methods = self.methods.clone();
237
238        Box::pin(async move {
239            // Handle parse error
240            let (method_name, _payload) = match parse_result {
241                Ok(parsed) => parsed,
242                Err(e) => {
243                    let response =
244                        format!("grpc-status: {} {}", GrpcStatus::InvalidArgument as u32, e);
245                    return Ok(response);
246                }
247            };
248
249            // Find matching method
250            let matched_method = methods.iter().find(|m| m.full_name() == method_name);
251
252            match matched_method {
253                Some(method) => {
254                    // In full implementation, would call handler here
255                    // For now, return success with handler info
256                    let response = format!(
257                        "grpc-status: {} {{\"handler\":\"{}\",\"method\":\"{}\",\"type\":\"{}\"}}",
258                        GrpcStatus::Ok as u32,
259                        method.handler,
260                        method.full_name(),
261                        match method.method_type {
262                            GrpcMethodType::Unary => "unary",
263                            GrpcMethodType::ClientStreaming => "client_streaming",
264                            GrpcMethodType::ServerStreaming => "server_streaming",
265                            GrpcMethodType::BidirectionalStreaming => "bidirectional_streaming",
266                        }
267                    );
268                    Ok(response)
269                }
270                None => {
271                    // Method not found
272                    let response = format!(
273                        "grpc-status: {} Method not found: {}",
274                        GrpcStatus::Unimplemented as u32,
275                        method_name
276                    );
277                    Ok(response)
278                }
279            }
280        })
281    }
282}
283
284/// gRPC request structure
285pub struct GrpcRequest {
286    /// The RPC method name (e.g., "GetUser", "ListUsers")
287    pub method: String,
288    /// The request payload (JSON for MVP, protobuf in production)
289    pub payload: String,
290}
291
292/// gRPC status codes
293///
294/// Based on <https://grpc.io/docs/guides/status-codes/>
295#[derive(Debug, Clone, Copy, PartialEq, Eq)]
296pub enum GrpcStatus {
297    /// Success
298    Ok = 0,
299    /// Invalid argument
300    InvalidArgument = 3,
301    /// Resource not found
302    NotFound = 5,
303    /// Unimplemented method
304    Unimplemented = 12,
305    /// Internal server error
306    Internal = 13,
307}
308
309impl GrpcStatus {
310    /// Get the status code name
311    pub fn code_name(&self) -> &str {
312        match self {
313            GrpcStatus::Ok => "OK",
314            GrpcStatus::InvalidArgument => "INVALID_ARGUMENT",
315            GrpcStatus::NotFound => "NOT_FOUND",
316            GrpcStatus::Unimplemented => "UNIMPLEMENTED",
317            GrpcStatus::Internal => "INTERNAL",
318        }
319    }
320}
321
322#[cfg(test)]
323mod tests {
324    use super::*;
325
326    #[test]
327    fn test_grpc_adapter_creation() {
328        let adapter = GrpcAdapter::new();
329        assert_eq!(adapter.name(), "grpc");
330    }
331
332    #[test]
333    fn test_method_registration_unary() {
334        let mut adapter = GrpcAdapter::new();
335        adapter.unary("UserService", "GetUser", "get_user_handler");
336
337        assert_eq!(adapter.methods.len(), 1);
338        assert_eq!(adapter.methods[0].service, "UserService");
339        assert_eq!(adapter.methods[0].method, "GetUser");
340        assert_eq!(adapter.methods[0].method_type, GrpcMethodType::Unary);
341        assert_eq!(adapter.methods[0].handler, "get_user_handler");
342    }
343
344    #[test]
345    fn test_method_registration_client_streaming() {
346        let mut adapter = GrpcAdapter::new();
347        adapter.client_streaming("UserService", "CreateUsers", "create_users_handler");
348
349        assert_eq!(adapter.methods.len(), 1);
350        assert_eq!(
351            adapter.methods[0].method_type,
352            GrpcMethodType::ClientStreaming
353        );
354    }
355
356    #[test]
357    fn test_method_registration_server_streaming() {
358        let mut adapter = GrpcAdapter::new();
359        adapter.server_streaming("UserService", "ListUsers", "list_users_handler");
360
361        assert_eq!(adapter.methods.len(), 1);
362        assert_eq!(
363            adapter.methods[0].method_type,
364            GrpcMethodType::ServerStreaming
365        );
366    }
367
368    #[test]
369    fn test_method_registration_bidirectional() {
370        let mut adapter = GrpcAdapter::new();
371        adapter.bidirectional_streaming("ChatService", "Chat", "chat_handler");
372
373        assert_eq!(adapter.methods.len(), 1);
374        assert_eq!(
375            adapter.methods[0].method_type,
376            GrpcMethodType::BidirectionalStreaming
377        );
378    }
379
380    #[test]
381    fn test_method_registration_multiple() {
382        let mut adapter = GrpcAdapter::new();
383        adapter.unary("UserService", "GetUser", "get_user");
384        adapter.unary("UserService", "DeleteUser", "delete_user");
385        adapter.server_streaming("UserService", "ListUsers", "list_users");
386
387        assert_eq!(adapter.methods.len(), 3);
388    }
389
390    #[test]
391    fn test_grpc_method_full_name() {
392        let method = GrpcMethod::new("UserService", "GetUser", GrpcMethodType::Unary, "handler");
393        assert_eq!(method.full_name(), "UserService.GetUser");
394    }
395
396    #[test]
397    fn test_match_method_found() {
398        let mut adapter = GrpcAdapter::new();
399        adapter.unary("UserService", "GetUser", "get_user_handler");
400
401        let matched = adapter.match_method("UserService.GetUser");
402        assert!(matched.is_some());
403        assert_eq!(matched.unwrap().handler, "get_user_handler");
404    }
405
406    #[test]
407    fn test_match_method_not_found() {
408        let adapter = GrpcAdapter::new();
409        let matched = adapter.match_method("UserService.GetUser");
410        assert!(matched.is_none());
411    }
412
413    #[test]
414    fn test_parse_request_valid() {
415        let adapter = GrpcAdapter::new();
416        let result = adapter.parse_request("UserService.GetUser:{\"id\":42}");
417
418        assert!(result.is_ok());
419        let (method, payload) = result.unwrap();
420        assert_eq!(method, "UserService.GetUser");
421        assert_eq!(payload, r#"{"id":42}"#);
422    }
423
424    #[test]
425    fn test_parse_request_empty() {
426        let adapter = GrpcAdapter::new();
427        let result = adapter.parse_request("");
428
429        assert!(result.is_err());
430        assert!(result.unwrap_err().contains("Empty"));
431    }
432
433    #[test]
434    fn test_parse_request_invalid() {
435        let adapter = GrpcAdapter::new();
436        let result = adapter.parse_request("InvalidRequest");
437
438        assert!(result.is_err());
439        assert!(result.unwrap_err().contains("Invalid gRPC request format"));
440    }
441
442    #[test]
443    fn test_proto_generation_empty() {
444        let adapter = GrpcAdapter::new();
445        let proto = adapter.generate_proto();
446        assert_eq!(proto, "");
447    }
448
449    #[test]
450    fn test_proto_generation_unary() {
451        let mut adapter = GrpcAdapter::new();
452        adapter.unary("UserService", "GetUser", "get_user");
453        adapter.unary("UserService", "DeleteUser", "delete_user");
454
455        let proto = adapter.generate_proto();
456        assert!(proto.contains("syntax = \"proto3\";"));
457        assert!(proto.contains("service UserService {"));
458        assert!(proto.contains("rpc GetUser(GetUserRequest) returns (GetUserResponse);"));
459        assert!(proto.contains("rpc DeleteUser(DeleteUserRequest) returns (DeleteUserResponse);"));
460    }
461
462    #[test]
463    fn test_proto_generation_streaming() {
464        let mut adapter = GrpcAdapter::new();
465        adapter.client_streaming("UserService", "CreateUsers", "create_users");
466        adapter.server_streaming("UserService", "ListUsers", "list_users");
467        adapter.bidirectional_streaming("ChatService", "Chat", "chat");
468
469        let proto = adapter.generate_proto();
470        assert!(proto
471            .contains("rpc CreateUsers(stream CreateUsersRequest) returns (CreateUsersResponse);"));
472        assert!(
473            proto.contains("rpc ListUsers(ListUsersRequest) returns (stream ListUsersResponse);")
474        );
475        assert!(proto.contains("rpc Chat(stream ChatRequest) returns (stream ChatResponse);"));
476    }
477
478    #[test]
479    fn test_proto_generation_multiple_services() {
480        let mut adapter = GrpcAdapter::new();
481        adapter.unary("UserService", "GetUser", "get_user");
482        adapter.unary("PostService", "GetPost", "get_post");
483
484        let proto = adapter.generate_proto();
485        assert!(proto.contains("service UserService {"));
486        assert!(proto.contains("service PostService {"));
487    }
488
489    #[test]
490    fn test_build_request() {
491        let adapter = GrpcAdapter::new();
492        let request = adapter.build_request("UserService.GetUser", r#"{"id":42}"#);
493
494        assert_eq!(request.method, "UserService.GetUser");
495        assert_eq!(request.payload, r#"{"id":42}"#);
496    }
497
498    #[test]
499    fn test_format_response() {
500        let adapter = GrpcAdapter::new();
501        let response = adapter.format_response(GrpcStatus::Ok, "success");
502
503        assert!(response.contains("grpc-status: 0"));
504        assert!(response.contains("success"));
505    }
506
507    #[tokio::test]
508    async fn test_handle_unary_success() {
509        let mut adapter = GrpcAdapter::new();
510        adapter.unary("UserService", "GetUser", "get_user_handler");
511
512        let result = adapter.handle("UserService.GetUser:{\"id\":42}").await;
513        assert!(result.is_ok());
514
515        let response = result.unwrap();
516        assert!(response.contains("grpc-status: 0"));
517        assert!(response.contains("get_user_handler"));
518        assert!(response.contains("unary"));
519    }
520
521    #[tokio::test]
522    async fn test_handle_streaming_success() {
523        let mut adapter = GrpcAdapter::new();
524        adapter.server_streaming("UserService", "ListUsers", "list_users_handler");
525
526        let result = adapter.handle("UserService.ListUsers:{}").await;
527        assert!(result.is_ok());
528
529        let response = result.unwrap();
530        assert!(response.contains("grpc-status: 0"));
531        assert!(response.contains("list_users_handler"));
532        assert!(response.contains("server_streaming"));
533    }
534
535    #[tokio::test]
536    async fn test_handle_method_not_found() {
537        let adapter = GrpcAdapter::new();
538        let result = adapter.handle("UserService.GetUser:{}").await;
539
540        assert!(result.is_ok());
541        let response = result.unwrap();
542        assert!(response.contains("grpc-status: 12")); // UNIMPLEMENTED
543        assert!(response.contains("Method not found"));
544    }
545
546    #[tokio::test]
547    async fn test_handle_invalid_request() {
548        let adapter = GrpcAdapter::new();
549        let result = adapter.handle("InvalidRequest").await;
550
551        assert!(result.is_ok());
552        let response = result.unwrap();
553        assert!(response.contains("grpc-status: 3")); // INVALID_ARGUMENT
554    }
555
556    #[test]
557    fn test_grpc_status_codes() {
558        assert_eq!(GrpcStatus::Ok.code_name(), "OK");
559        assert_eq!(GrpcStatus::InvalidArgument.code_name(), "INVALID_ARGUMENT");
560        assert_eq!(GrpcStatus::NotFound.code_name(), "NOT_FOUND");
561        assert_eq!(GrpcStatus::Unimplemented.code_name(), "UNIMPLEMENTED");
562        assert_eq!(GrpcStatus::Internal.code_name(), "INTERNAL");
563    }
564
565    #[test]
566    fn test_grpc_method_new() {
567        let method = GrpcMethod::new("UserService", "GetUser", GrpcMethodType::Unary, "handler");
568        assert_eq!(method.service, "UserService");
569        assert_eq!(method.method, "GetUser");
570        assert_eq!(method.method_type, GrpcMethodType::Unary);
571        assert_eq!(method.handler, "handler");
572    }
573}