spring_grpc/
lib.rs

1//! [![spring-rs](https://img.shields.io/github/stars/spring-rs/spring-rs)](https://spring-rs.github.io/docs/plugins/spring-grpc)
2#![doc(html_favicon_url = "https://spring-rs.github.io/favicon.ico")]
3#![doc(html_logo_url = "https://spring-rs.github.io/logo.svg")]
4pub mod config;
5
6pub use tonic;
7
8use anyhow::Context;
9use config::GrpcConfig;
10use http::Request;
11use spring::{
12    app::AppBuilder,
13    config::ConfigRegistry,
14    error::Result,
15    plugin::{component::ComponentRef, ComponentRegistry, MutableComponentRegistry, Plugin},
16    App,
17};
18use std::{convert::Infallible, net::SocketAddr, sync::Arc};
19use tonic::{
20    async_trait,
21    body::Body,
22    server::NamedService,
23    service::{Routes, RoutesBuilder},
24    transport::Server,
25};
26use tower::Service;
27
28/// Grpc Configurator
29pub trait GrpcConfigurator {
30    /// add grpc service to app registry
31    fn add_service<S>(&mut self, service: S) -> &mut Self
32    where
33        S: Service<Request<Body>, Error = Infallible>
34            + NamedService
35            + Clone
36            + Send
37            + Sync
38            + 'static,
39        S::Response: axum::response::IntoResponse,
40        S::Future: Send + 'static;
41}
42
43impl GrpcConfigurator for AppBuilder {
44    fn add_service<S>(&mut self, svc: S) -> &mut Self
45    where
46        S: Service<Request<Body>, Error = Infallible>
47            + NamedService
48            + Clone
49            + Send
50            + Sync
51            + 'static,
52        S::Response: axum::response::IntoResponse,
53        S::Future: Send + 'static,
54    {
55        if let Some(routes) = self.get_component_ref::<RoutesBuilder>() {
56            unsafe {
57                let raw_ptr = ComponentRef::into_raw(routes);
58                let routes = &mut *(raw_ptr as *mut RoutesBuilder);
59                routes.add_service(svc);
60            }
61            self
62        } else {
63            let mut route_builder = Routes::builder();
64            route_builder.add_service(svc);
65            self.add_component(route_builder)
66        }
67    }
68}
69
70/// Grpc Plugin Definition
71pub struct GrpcPlugin;
72
73#[async_trait]
74impl Plugin for GrpcPlugin {
75    async fn build(&self, app: &mut AppBuilder) {
76        let config = app
77            .get_config::<GrpcConfig>()
78            .expect("grpc plugin config load failed");
79
80        app.add_scheduler(move |app| Box::new(Self::schedule(app, config)));
81    }
82}
83
84impl GrpcPlugin {
85    async fn schedule(app: Arc<App>, config: GrpcConfig) -> Result<String> {
86        // Get the router in the final schedule step
87        let routes_builder = app.get_component::<RoutesBuilder>();
88
89        let routes = if let Some(routes_builder) = routes_builder {
90            routes_builder.routes()
91        } else {
92            return Ok(
93                "The grpc plugin does not register any routes, so no scheduling is performed"
94                    .to_string(),
95            );
96        };
97
98        let mut server = Server::builder()
99            .accept_http1(config.accept_http1)
100            .http2_adaptive_window(config.http2_adaptive_window)
101            .http2_keepalive_interval(config.http2_keepalive_interval)
102            .http2_keepalive_timeout(config.http2_keepalive_timeout)
103            .http2_max_header_list_size(config.http2_max_header_list_size)
104            .http2_max_pending_accept_reset_streams(config.http2_max_pending_accept_reset_streams)
105            .initial_connection_window_size(config.initial_connection_window_size)
106            .initial_stream_window_size(config.initial_stream_window_size)
107            .max_concurrent_streams(config.max_concurrent_streams)
108            .max_frame_size(config.max_frame_size)
109            .tcp_keepalive(config.tcp_keepalive)
110            .tcp_nodelay(config.tcp_nodelay);
111
112        if let Some(max_connection_age) = config.max_connection_age {
113            server = server.max_connection_age(max_connection_age);
114        }
115        if let Some(timeout) = config.timeout {
116            server = server.timeout(timeout);
117        }
118        if let Some(concurrency_limit_per_connection) = config.concurrency_limit_per_connection {
119            server = server.concurrency_limit_per_connection(concurrency_limit_per_connection);
120        }
121
122        server = Self::apply_middleware(server);
123
124        let addr = SocketAddr::new(config.binding, config.port);
125        tracing::info!("tonic grpc service bind tcp listener: {}", addr);
126
127        let router = server.add_routes(routes);
128        if config.graceful {
129            router
130                .serve_with_shutdown(addr, shutdown_signal())
131                .await
132                .with_context(|| format!("bind tcp listener failed:{addr}"))?;
133        } else {
134            router
135                .serve(addr)
136                .await
137                .with_context(|| format!("bind tcp listener failed:{addr}"))?;
138        }
139        Ok("tonic server schedule finished".to_string())
140    }
141
142    fn apply_middleware(_server: Server) -> Server {
143        // TODO: add middleware
144        _server
145    }
146}
147
148async fn shutdown_signal() {
149    let ctrl_c = async {
150        tokio::signal::ctrl_c()
151            .await
152            .expect("failed to install Ctrl+C handler");
153    };
154
155    #[cfg(unix)]
156    let terminate = async {
157        tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate())
158            .expect("failed to install signal handler")
159            .recv()
160            .await;
161    };
162
163    #[cfg(not(unix))]
164    let terminate = std::future::pending::<()>();
165
166    tokio::select! {
167        _ = ctrl_c => {
168            tracing::info!("Received Ctrl+C signal, waiting for tonic grpc server shutdown")
169        },
170        _ = terminate => {
171            tracing::info!("Received kill signal, waiting for tonic grpc server shutdown")
172        },
173    }
174}