Skip to main content

rs_zero/rpc/
server.rs

1use std::{convert::Infallible, future::Future, net::SocketAddr};
2
3use tonic::server::NamedService;
4
5use crate::core::{CoreError, Service, ServiceFuture, ShutdownToken};
6
7/// Starts a tonic health server with graceful shutdown.
8///
9/// This helper gives the MVP a concrete, generated tonic service without
10/// requiring application projects to define protobuf files immediately.
11pub async fn serve_health_with_shutdown<F>(
12    addr: SocketAddr,
13    shutdown: F,
14) -> Result<(), tonic::transport::Error>
15where
16    F: Future<Output = ()> + Send + 'static,
17{
18    let (_reporter, health_service) = tonic_health::server::health_reporter();
19
20    tonic::transport::Server::builder()
21        .add_service(health_service)
22        .serve_with_shutdown(addr, shutdown)
23        .await
24}
25
26/// [`Service`] adapter for running one tonic service inside a service group.
27pub struct TonicService<S> {
28    name: String,
29    addr: SocketAddr,
30    service: std::sync::Mutex<Option<S>>,
31}
32
33impl<S> TonicService<S> {
34    /// Creates a tonic service-group adapter.
35    pub fn new(name: impl Into<String>, addr: SocketAddr, service: S) -> Self {
36        Self {
37            name: name.into(),
38            addr,
39            service: std::sync::Mutex::new(Some(service)),
40        }
41    }
42
43    /// Returns the configured listen address.
44    pub fn addr(&self) -> SocketAddr {
45        self.addr
46    }
47}
48
49impl<S> Service for TonicService<S>
50where
51    S: tower::Service<
52            http::Request<tonic::body::Body>,
53            Response = http::Response<tonic::body::Body>,
54            Error = Infallible,
55        > + NamedService
56        + Clone
57        + Send
58        + Sync
59        + 'static,
60    S::Future: Send + 'static,
61{
62    fn name(&self) -> &str {
63        &self.name
64    }
65
66    fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
67        Box::pin(async move {
68            let service = self
69                .service
70                .lock()
71                .expect("tonic service mutex")
72                .take()
73                .ok_or_else(|| {
74                    CoreError::Service(format!("service {} already started", self.name))
75                })?;
76            tonic::transport::Server::builder()
77                .add_service(service)
78                .serve_with_shutdown(self.addr, async move {
79                    shutdown.cancelled().await;
80                })
81                .await
82                .map_err(|error| {
83                    CoreError::Service(format!("RPC service {} failed: {error}", self.name))
84                })
85        })
86    }
87}
88
89/// Health-check RPC service adapter for service groups.
90pub struct TonicHealthService {
91    name: String,
92    addr: SocketAddr,
93}
94
95impl TonicHealthService {
96    /// Creates a tonic health service adapter.
97    pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
98        Self {
99            name: name.into(),
100            addr,
101        }
102    }
103}
104
105impl Service for TonicHealthService {
106    fn name(&self) -> &str {
107        &self.name
108    }
109
110    fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
111        Box::pin(async move {
112            serve_health_with_shutdown(self.addr, async move {
113                shutdown.cancelled().await;
114            })
115            .await
116            .map_err(|error| {
117                CoreError::Service(format!("RPC service {} failed: {error}", self.name))
118            })
119        })
120    }
121}
122
123/// Tower-first RPC server layer stack builder.
124#[cfg(feature = "resil")]
125#[derive(Debug, Clone)]
126pub struct RpcServerLayerStack {
127    config: crate::rpc::RpcServerConfig,
128    #[cfg(feature = "observability")]
129    metrics: Option<crate::observability::MetricsRegistry>,
130}
131
132#[cfg(feature = "resil")]
133impl RpcServerLayerStack {
134    /// Creates an RPC server layer stack from runtime configuration.
135    pub fn new(config: crate::rpc::RpcServerConfig) -> Self {
136        Self {
137            config,
138            #[cfg(feature = "observability")]
139            metrics: None,
140        }
141    }
142
143    /// Creates a production-oriented RPC server layer stack.
144    pub fn production_defaults(name: impl Into<String>, addr: SocketAddr) -> Self {
145        Self::new(crate::rpc::RpcServerConfig::production_defaults(name, addr))
146    }
147
148    /// Creates a production-oriented RPC server layer stack.
149    #[allow(deprecated)]
150    #[deprecated(note = "use production_defaults instead")]
151    pub fn go_zero_defaults(name: impl Into<String>, addr: SocketAddr) -> Self {
152        Self::production_defaults(name, addr)
153    }
154
155    /// Attaches a metrics registry to server-side RPC observation.
156    #[cfg(feature = "observability")]
157    pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
158        self.metrics = Some(metrics);
159        self
160    }
161
162    /// Returns the Tower layer used by tonic `Server::builder().layer(...)`.
163    pub fn into_layer(self) -> crate::rpc::RpcUnaryResilienceLayer {
164        #[cfg(feature = "observability")]
165        {
166            let mut resilience = crate::rpc::RpcResilienceLayer::new(
167                self.config.name.clone(),
168                self.config.resilience.clone(),
169            );
170            if let Some(metrics) = self.metrics {
171                resilience = resilience.with_metrics(metrics);
172            }
173            crate::rpc::RpcUnaryResilienceLayer::new(resilience)
174        }
175
176        #[cfg(not(feature = "observability"))]
177        {
178            crate::rpc::RpcUnaryResilienceLayer::new(crate::rpc::RpcResilienceLayer::new(
179                self.config.name.clone(),
180                self.config.resilience.clone(),
181            ))
182        }
183    }
184}