allframe_core/grpc/
server.rs1use std::{net::SocketAddr, time::Duration};
21
22use super::tls::TlsConfig;
23use crate::shutdown::GracefulShutdown;
24
25#[derive(Debug)]
27pub enum GrpcServerError {
28 Bind(String),
30 Tls(String),
32 Server(String),
34 Config(String),
36}
37
38impl std::fmt::Display for GrpcServerError {
39 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
40 match self {
41 GrpcServerError::Bind(msg) => write!(f, "Failed to bind: {}", msg),
42 GrpcServerError::Tls(msg) => write!(f, "TLS error: {}", msg),
43 GrpcServerError::Server(msg) => write!(f, "Server error: {}", msg),
44 GrpcServerError::Config(msg) => write!(f, "Configuration error: {}", msg),
45 }
46 }
47}
48
49impl std::error::Error for GrpcServerError {}
50
51pub struct GrpcServerBuilder {
56 addr: SocketAddr,
57 tls_config: Option<TlsConfig>,
58 reflection_descriptor: Option<&'static [u8]>,
59 health_check: bool,
60 shutdown: Option<GracefulShutdown>,
61 shutdown_timeout: Duration,
62}
63
64impl Default for GrpcServerBuilder {
65 fn default() -> Self {
66 Self::new()
67 }
68}
69
70impl GrpcServerBuilder {
71 pub fn new() -> Self {
73 Self {
74 addr: "[::1]:50051".parse().unwrap(),
75 tls_config: None,
76 reflection_descriptor: None,
77 health_check: false,
78 shutdown: None,
79 shutdown_timeout: Duration::from_secs(30),
80 }
81 }
82
83 pub fn addr(mut self, addr: impl Into<SocketAddrInput>) -> Self {
94 self.addr = addr.into().0;
95 self
96 }
97
98 pub fn port(mut self, port: u16) -> Self {
100 self.addr = SocketAddr::from(([0, 0, 0, 0, 0, 0, 0, 1], port));
101 self
102 }
103
104 pub fn tls(mut self, config: TlsConfig) -> Self {
106 self.tls_config = Some(config);
107 self
108 }
109
110 pub fn tls_from_env(mut self) -> Self {
115 self.tls_config = TlsConfig::from_env();
116 self
117 }
118
119 pub fn reflection(mut self, file_descriptor_set: &'static [u8]) -> Self {
135 self.reflection_descriptor = Some(file_descriptor_set);
136 self
137 }
138
139 pub fn health_check(mut self) -> Self {
143 self.health_check = true;
144 self
145 }
146
147 pub fn graceful_shutdown(mut self, shutdown: GracefulShutdown) -> Self {
152 self.shutdown = Some(shutdown);
153 self
154 }
155
156 pub fn shutdown_timeout(mut self, timeout: Duration) -> Self {
161 self.shutdown_timeout = timeout;
162 self
163 }
164
165 pub fn get_addr(&self) -> SocketAddr {
167 self.addr
168 }
169
170 pub fn has_tls(&self) -> bool {
172 self.tls_config.is_some()
173 }
174
175 pub fn has_reflection(&self) -> bool {
177 self.reflection_descriptor.is_some()
178 }
179
180 pub fn has_health_check(&self) -> bool {
182 self.health_check
183 }
184
185 #[cfg(feature = "router-grpc")]
208 pub async fn serve_router(
209 self,
210 router: tonic::transport::server::Router,
211 ) -> Result<(), GrpcServerError> {
212 if let Some(shutdown) = self.shutdown {
214 let mut token = shutdown.token();
215 router
216 .serve_with_shutdown(self.addr, async move {
217 token.cancelled().await;
218 })
219 .await
220 .map_err(|e| GrpcServerError::Server(e.to_string()))?;
221 } else {
222 router
223 .serve(self.addr)
224 .await
225 .map_err(|e| GrpcServerError::Server(e.to_string()))?;
226 }
227
228 Ok(())
229 }
230
231 #[cfg(feature = "router-grpc")]
236 pub fn server_builder(&self) -> tonic::transport::Server {
237 tonic::transport::Server::builder()
238 }
239
240 #[cfg(feature = "router-grpc")]
244 pub fn reflection_service(
245 &self,
246 ) -> Result<
247 Option<
248 tonic_reflection::server::v1::ServerReflectionServer<
249 impl tonic_reflection::server::v1::ServerReflection,
250 >,
251 >,
252 GrpcServerError,
253 > {
254 if let Some(fds) = self.reflection_descriptor {
255 let service = tonic_reflection::server::Builder::configure()
256 .register_encoded_file_descriptor_set(fds)
257 .build_v1()
258 .map_err(|e| GrpcServerError::Config(format!("Reflection setup failed: {}", e)))?;
259 Ok(Some(service))
260 } else {
261 Ok(None)
262 }
263 }
264
265 #[cfg(feature = "router-grpc")]
281 pub fn create_health_reporter(
282 &self,
283 ) -> Option<(
284 tonic_health::server::HealthReporter,
285 impl tonic::codegen::Service<
286 hyper::Request<hyper::body::Incoming>,
287 Response = hyper::Response<tonic::body::Body>,
288 Error = std::convert::Infallible,
289 > + Clone
290 + Send
291 + 'static,
292 )> {
293 if self.health_check {
294 Some(tonic_health::server::health_reporter())
295 } else {
296 None
297 }
298 }
299}
300
301pub struct SocketAddrInput(SocketAddr);
303
304impl From<SocketAddr> for SocketAddrInput {
305 fn from(addr: SocketAddr) -> Self {
306 Self(addr)
307 }
308}
309
310impl From<&str> for SocketAddrInput {
311 fn from(s: &str) -> Self {
312 Self(s.parse().expect("Invalid socket address"))
313 }
314}
315
316impl From<String> for SocketAddrInput {
317 fn from(s: String) -> Self {
318 Self(s.parse().expect("Invalid socket address"))
319 }
320}
321
322impl From<([u8; 4], u16)> for SocketAddrInput {
323 fn from((ip, port): ([u8; 4], u16)) -> Self {
324 Self(SocketAddr::from((ip, port)))
325 }
326}
327
328impl From<([u16; 8], u16)> for SocketAddrInput {
329 fn from((ip, port): ([u16; 8], u16)) -> Self {
330 Self(SocketAddr::from((ip, port)))
331 }
332}
333
334pub type GrpcServer = GrpcServerBuilder;
336
337#[cfg(test)]
338mod tests {
339 use super::*;
340
341 #[test]
342 fn test_builder_default() {
343 let builder = GrpcServerBuilder::new();
344 assert_eq!(builder.get_addr().port(), 50051);
345 assert!(!builder.has_tls());
346 assert!(!builder.has_reflection());
347 assert!(!builder.has_health_check());
348 }
349
350 #[test]
351 fn test_builder_addr_string() {
352 let builder = GrpcServerBuilder::new().addr("127.0.0.1:9000");
353 assert_eq!(builder.get_addr().port(), 9000);
354 }
355
356 #[test]
357 fn test_builder_port() {
358 let builder = GrpcServerBuilder::new().port(8080);
359 assert_eq!(builder.get_addr().port(), 8080);
360 }
361
362 #[test]
363 fn test_builder_tls() {
364 let tls = TlsConfig::new("/path/to/cert.pem", "/path/to/key.pem");
365 let builder = GrpcServerBuilder::new().tls(tls);
366 assert!(builder.has_tls());
367 }
368
369 #[test]
370 fn test_builder_reflection() {
371 static FDS: &[u8] = b"fake descriptor";
372 let builder = GrpcServerBuilder::new().reflection(FDS);
373 assert!(builder.has_reflection());
374 }
375
376 #[test]
377 fn test_builder_health_check() {
378 let builder = GrpcServerBuilder::new().health_check();
379 assert!(builder.has_health_check());
380 }
381
382 #[test]
383 fn test_builder_shutdown_timeout() {
384 let builder = GrpcServerBuilder::new().shutdown_timeout(Duration::from_secs(60));
385 assert_eq!(builder.shutdown_timeout, Duration::from_secs(60));
386 }
387
388 #[test]
389 fn test_grpc_server_error_display() {
390 let err = GrpcServerError::Bind("address in use".to_string());
391 assert!(err.to_string().contains("address in use"));
392
393 let err = GrpcServerError::Tls("invalid cert".to_string());
394 assert!(err.to_string().contains("invalid cert"));
395 }
396}