1#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4pub mod config;
5
6pub use tonic;
7
8use anyhow::Context;
9use config::GrpcConfig;
10use http::Request;
11use spring::{
12 app::AppBuilder,
13 config::ConfigRegistry,
14 error::Result,
15 plugin::{component::ComponentRef, ComponentRegistry, MutableComponentRegistry, Plugin},
16 App,
17};
18use std::{convert::Infallible, net::SocketAddr, sync::Arc};
19use tonic::{
20 async_trait,
21 body::Body,
22 server::NamedService,
23 service::{Routes, RoutesBuilder},
24 transport::Server,
25};
26use tower::Service;
27
28pub trait GrpcConfigurator {
30 fn add_service<S>(&mut self, service: S) -> &mut Self
32 where
33 S: Service<Request<Body>, Error = Infallible>
34 + NamedService
35 + Clone
36 + Send
37 + Sync
38 + 'static,
39 S::Response: axum::response::IntoResponse,
40 S::Future: Send + 'static;
41}
42
43impl GrpcConfigurator for AppBuilder {
44 fn add_service<S>(&mut self, svc: S) -> &mut Self
45 where
46 S: Service<Request<Body>, Error = Infallible>
47 + NamedService
48 + Clone
49 + Send
50 + Sync
51 + 'static,
52 S::Response: axum::response::IntoResponse,
53 S::Future: Send + 'static,
54 {
55 if let Some(routes) = self.get_component_ref::<RoutesBuilder>() {
56 unsafe {
57 let raw_ptr = ComponentRef::into_raw(routes);
58 let routes = &mut *(raw_ptr as *mut RoutesBuilder);
59 routes.add_service(svc);
60 }
61 self
62 } else {
63 let mut route_builder = Routes::builder();
64 route_builder.add_service(svc);
65 self.add_component(route_builder)
66 }
67 }
68}
69
70pub struct GrpcPlugin;
72
73#[async_trait]
74impl Plugin for GrpcPlugin {
75 async fn build(&self, app: &mut AppBuilder) {
76 let config = app
77 .get_config::<GrpcConfig>()
78 .expect("grpc plugin config load failed");
79
80 app.add_scheduler(move |app| Box::new(Self::schedule(app, config)));
81 }
82}
83
84impl GrpcPlugin {
85 async fn schedule(app: Arc<App>, config: GrpcConfig) -> Result<String> {
86 let routes_builder = app.get_component::<RoutesBuilder>();
88
89 let routes = if let Some(routes_builder) = routes_builder {
90 routes_builder.routes()
91 } else {
92 return Ok(
93 "The grpc plugin does not register any routes, so no scheduling is performed"
94 .to_string(),
95 );
96 };
97
98 let mut server = Server::builder()
99 .accept_http1(config.accept_http1)
100 .http2_adaptive_window(config.http2_adaptive_window)
101 .http2_keepalive_interval(config.http2_keepalive_interval)
102 .http2_keepalive_timeout(config.http2_keepalive_timeout)
103 .http2_max_header_list_size(config.http2_max_header_list_size)
104 .http2_max_pending_accept_reset_streams(config.http2_max_pending_accept_reset_streams)
105 .initial_connection_window_size(config.initial_connection_window_size)
106 .initial_stream_window_size(config.initial_stream_window_size)
107 .max_concurrent_streams(config.max_concurrent_streams)
108 .max_frame_size(config.max_frame_size)
109 .tcp_keepalive(config.tcp_keepalive)
110 .tcp_nodelay(config.tcp_nodelay);
111
112 if let Some(max_connection_age) = config.max_connection_age {
113 server = server.max_connection_age(max_connection_age);
114 }
115 if let Some(timeout) = config.timeout {
116 server = server.timeout(timeout);
117 }
118 if let Some(concurrency_limit_per_connection) = config.concurrency_limit_per_connection {
119 server = server.concurrency_limit_per_connection(concurrency_limit_per_connection);
120 }
121
122 server = Self::apply_middleware(server);
123
124 let addr = SocketAddr::new(config.binding, config.port);
125 tracing::info!("tonic grpc service bind tcp listener: {}", addr);
126
127 let router = server.add_routes(routes);
128 if config.graceful {
129 router
130 .serve_with_shutdown(addr, shutdown_signal())
131 .await
132 .with_context(|| format!("bind tcp listener failed:{addr}"))?;
133 } else {
134 router
135 .serve(addr)
136 .await
137 .with_context(|| format!("bind tcp listener failed:{addr}"))?;
138 }
139 Ok("tonic server schedule finished".to_string())
140 }
141
142 fn apply_middleware(_server: Server) -> Server {
143 _server
145 }
146}
147
148async fn shutdown_signal() {
149 let ctrl_c = async {
150 tokio::signal::ctrl_c()
151 .await
152 .expect("failed to install Ctrl+C handler");
153 };
154
155 #[cfg(unix)]
156 let terminate = async {
157 tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
158 .expect("failed to install signal handler")
159 .recv()
160 .await;
161 };
162
163 #[cfg(not(unix))]
164 let terminate = std::future::pending::<()>();
165
166 tokio::select! {
167 _ = ctrl_c => {
168 tracing::info!("Received Ctrl+C signal, waiting for tonic grpc server shutdown")
169 },
170 _ = terminate => {
171 tracing::info!("Received kill signal, waiting for tonic grpc server shutdown")
172 },
173 }
174}