allframe_core/router/
grpc_prod.rs1use std::future::Future;
7#[cfg(feature = "router-grpc")]
8use std::pin::Pin;
9
10#[cfg(feature = "router-grpc")]
11use futures::Stream;
12#[cfg(feature = "router-grpc")]
13use prost::Message;
14#[cfg(feature = "router-grpc")]
15use tokio_stream::StreamExt;
16#[cfg(feature = "router-grpc")]
17use tonic::{transport::Server, Code, Status, Streaming};
18
19use super::ProtocolAdapter;
20
21#[cfg(feature = "router-grpc")]
32pub struct GrpcProductionAdapter {
33 service_name: String,
34}
35
36#[cfg(feature = "router-grpc")]
37impl GrpcProductionAdapter {
38 pub fn new(service_name: impl Into<String>) -> Self {
40 Self {
41 service_name: service_name.into(),
42 }
43 }
44
45 pub fn service_name(&self) -> &str {
47 &self.service_name
48 }
49
50 pub fn server_builder() -> Server {
52 Server::builder()
53 }
54
55 pub fn status_from_code(code: Code, message: impl Into<String>) -> Status {
57 Status::new(code, message)
58 }
59
60 pub fn enable_reflection() -> tonic_reflection::server::Builder<'static> {
67 tonic_reflection::server::Builder::configure()
68 }
69}
70
71#[cfg(feature = "router-grpc")]
72impl ProtocolAdapter for GrpcProductionAdapter {
73 fn name(&self) -> &str {
74 "grpc-production"
75 }
76
77 fn handle(
78 &self,
79 _request: &str,
80 ) -> Pin<Box<dyn Future<Output = Result<String, String>> + Send + '_>> {
81 Box::pin(async move {
82 Ok("gRPC production adapter".to_string())
88 })
89 }
90}
91
92#[cfg(feature = "router-grpc")]
94#[tonic::async_trait]
95pub trait GrpcService: Send + Sync + 'static {
96 const NAME: &'static str;
98
99 async fn handle_unary(&self, method: &str, request: Vec<u8>) -> Result<Vec<u8>, Status>;
101}
102
103#[cfg(feature = "router-grpc")]
105pub mod streaming {
106 use super::*;
107
108 pub type ServerStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
110
111 pub type ClientStream<T> = Streaming<T>;
113
114 pub type BidiStream<T> = Pin<Box<dyn Stream<Item = Result<T, Status>> + Send>>;
116
117 pub fn from_vec<T>(items: Vec<T>) -> ServerStream<T>
119 where
120 T: Send + 'static,
121 {
122 Box::pin(tokio_stream::iter(items.into_iter().map(Ok)))
123 }
124
125 pub fn from_iter<T, I>(iter: I) -> ServerStream<T>
127 where
128 T: Send + 'static,
129 I: Iterator<Item = T> + Send + 'static,
130 {
131 Box::pin(tokio_stream::iter(iter.map(Ok)))
132 }
133
134 pub async fn collect_stream<T>(mut stream: ClientStream<T>) -> Result<Vec<T>, Status>
136 where
137 T: Send + 'static,
138 {
139 let mut items = Vec::new();
140 while let Some(item) = stream.next().await {
141 items.push(item?);
142 }
143 Ok(items)
144 }
145
146 pub async fn map_stream<T, U, F>(
148 mut stream: ClientStream<T>,
149 mut f: F,
150 ) -> Result<Vec<U>, Status>
151 where
152 T: Send + 'static,
153 U: Send + 'static,
154 F: FnMut(T) -> U + Send,
155 {
156 let mut items = Vec::new();
157 while let Some(item) = stream.next().await {
158 items.push(f(item?));
159 }
160 Ok(items)
161 }
162}
163
164#[cfg(feature = "router-grpc")]
166pub mod protobuf {
167 use super::*;
168
169 pub fn encode<M: Message>(message: &M) -> Result<Vec<u8>, String> {
171 let mut buf = Vec::new();
172 message
173 .encode(&mut buf)
174 .map_err(|e| format!("Failed to encode message: {}", e))?;
175 Ok(buf)
176 }
177
178 pub fn decode<M: Message + Default>(bytes: &[u8]) -> Result<M, String> {
180 M::decode(bytes).map_err(|e| format!("Failed to decode message: {}", e))
181 }
182}
183
184#[cfg(feature = "router-grpc")]
186pub mod status {
187 use super::*;
188
189 pub fn ok() -> Status {
191 Status::ok("Success")
192 }
193
194 pub fn invalid_argument(message: impl Into<String>) -> Status {
196 Status::new(Code::InvalidArgument, message)
197 }
198
199 pub fn not_found(message: impl Into<String>) -> Status {
201 Status::new(Code::NotFound, message)
202 }
203
204 pub fn unimplemented(message: impl Into<String>) -> Status {
206 Status::new(Code::Unimplemented, message)
207 }
208
209 pub fn internal(message: impl Into<String>) -> Status {
211 Status::new(Code::Internal, message)
212 }
213}
214
215#[cfg(test)]
216#[cfg(feature = "router-grpc")]
217mod tests {
218 use super::*;
219
220 #[test]
221 fn test_grpc_adapter_creation() {
222 let adapter = GrpcProductionAdapter::new("UserService");
223 assert_eq!(adapter.service_name(), "UserService");
224 assert_eq!(adapter.name(), "grpc-production");
225 }
226
227 #[test]
228 fn test_status_codes() {
229 let ok_status = status::ok();
230 assert_eq!(ok_status.code(), Code::Ok);
231
232 let invalid = status::invalid_argument("Bad request");
233 assert_eq!(invalid.code(), Code::InvalidArgument);
234
235 let not_found = status::not_found("User not found");
236 assert_eq!(not_found.code(), Code::NotFound);
237
238 let unimplemented = status::unimplemented("Not implemented");
239 assert_eq!(unimplemented.code(), Code::Unimplemented);
240
241 let internal = status::internal("Internal error");
242 assert_eq!(internal.code(), Code::Internal);
243 }
244
245 #[tokio::test]
246 async fn test_protobuf_encoding() {
247 use prost_types::Timestamp;
249
250 let ts = Timestamp {
251 seconds: 12345,
252 nanos: 67890,
253 };
254
255 let bytes = protobuf::encode(&ts).unwrap();
257 assert!(!bytes.is_empty());
258
259 let decoded: Timestamp = protobuf::decode(&bytes).unwrap();
261 assert_eq!(decoded.seconds, 12345);
262 assert_eq!(decoded.nanos, 67890);
263 }
264
265 #[tokio::test]
266 async fn test_server_streaming_from_vec() {
267 use tokio_stream::StreamExt;
268
269 let items = vec![1, 2, 3, 4, 5];
271 let mut stream = streaming::from_vec(items);
272
273 let mut results = Vec::new();
275 while let Some(item) = stream.next().await {
276 results.push(item.unwrap());
277 }
278
279 assert_eq!(results, vec![1, 2, 3, 4, 5]);
280 }
281
282 #[tokio::test]
283 async fn test_server_streaming_from_iter() {
284 use tokio_stream::StreamExt;
285
286 let mut stream = streaming::from_iter(1..=5);
288
289 let mut results = Vec::new();
291 while let Some(item) = stream.next().await {
292 results.push(item.unwrap());
293 }
294
295 assert_eq!(results, vec![1, 2, 3, 4, 5]);
296 }
297
298 #[test]
299 fn test_grpc_service_name() {
300 let adapter = GrpcProductionAdapter::new("UserService");
301 assert_eq!(adapter.service_name(), "UserService");
302 }
303
304 #[test]
305 fn test_all_status_codes() {
306 assert_eq!(status::ok().code(), Code::Ok);
308 assert_eq!(
309 status::invalid_argument("msg").code(),
310 Code::InvalidArgument
311 );
312 assert_eq!(status::not_found("msg").code(), Code::NotFound);
313 assert_eq!(status::unimplemented("msg").code(), Code::Unimplemented);
314 assert_eq!(status::internal("msg").code(), Code::Internal);
315 }
316}