1use std::{convert::Infallible, future::Future, net::SocketAddr};
2
3use tonic::server::NamedService;
4
5use crate::core::{CoreError, Service, ServiceFuture, ShutdownToken};
6
7pub async fn serve_health_with_shutdown<F>(
12 addr: SocketAddr,
13 shutdown: F,
14) -> Result<(), tonic::transport::Error>
15where
16 F: Future<Output = ()> + Send + 'static,
17{
18 let (_reporter, health_service) = tonic_health::server::health_reporter();
19
20 tonic::transport::Server::builder()
21 .add_service(health_service)
22 .serve_with_shutdown(addr, shutdown)
23 .await
24}
25
26pub struct TonicService<S> {
28 name: String,
29 addr: SocketAddr,
30 service: std::sync::Mutex<Option<S>>,
31}
32
33impl<S> TonicService<S> {
34 pub fn new(name: impl Into<String>, addr: SocketAddr, service: S) -> Self {
36 Self {
37 name: name.into(),
38 addr,
39 service: std::sync::Mutex::new(Some(service)),
40 }
41 }
42
43 pub fn addr(&self) -> SocketAddr {
45 self.addr
46 }
47}
48
49impl<S> Service for TonicService<S>
50where
51 S: tower::Service<
52 http::Request<tonic::body::Body>,
53 Response = http::Response<tonic::body::Body>,
54 Error = Infallible,
55 > + NamedService
56 + Clone
57 + Send
58 + Sync
59 + 'static,
60 S::Future: Send + 'static,
61{
62 fn name(&self) -> &str {
63 &self.name
64 }
65
66 fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
67 Box::pin(async move {
68 let service = self
69 .service
70 .lock()
71 .expect("tonic service mutex")
72 .take()
73 .ok_or_else(|| {
74 CoreError::Service(format!("service {} already started", self.name))
75 })?;
76 tonic::transport::Server::builder()
77 .add_service(service)
78 .serve_with_shutdown(self.addr, async move {
79 shutdown.cancelled().await;
80 })
81 .await
82 .map_err(|error| {
83 CoreError::Service(format!("RPC service {} failed: {error}", self.name))
84 })
85 })
86 }
87}
88
89pub struct TonicHealthService {
91 name: String,
92 addr: SocketAddr,
93}
94
95impl TonicHealthService {
96 pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
98 Self {
99 name: name.into(),
100 addr,
101 }
102 }
103}
104
105impl Service for TonicHealthService {
106 fn name(&self) -> &str {
107 &self.name
108 }
109
110 fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
111 Box::pin(async move {
112 serve_health_with_shutdown(self.addr, async move {
113 shutdown.cancelled().await;
114 })
115 .await
116 .map_err(|error| {
117 CoreError::Service(format!("RPC service {} failed: {error}", self.name))
118 })
119 })
120 }
121}
122
123#[cfg(feature = "resil")]
125#[derive(Debug, Clone)]
126pub struct RpcServerLayerStack {
127 config: crate::rpc::RpcServerConfig,
128 #[cfg(feature = "observability")]
129 metrics: Option<crate::observability::MetricsRegistry>,
130}
131
132#[cfg(feature = "resil")]
133impl RpcServerLayerStack {
134 pub fn new(config: crate::rpc::RpcServerConfig) -> Self {
136 Self {
137 config,
138 #[cfg(feature = "observability")]
139 metrics: None,
140 }
141 }
142
143 pub fn production_defaults(name: impl Into<String>, addr: SocketAddr) -> Self {
145 Self::new(crate::rpc::RpcServerConfig::production_defaults(name, addr))
146 }
147
148 #[allow(deprecated)]
150 #[deprecated(note = "use production_defaults instead")]
151 pub fn go_zero_defaults(name: impl Into<String>, addr: SocketAddr) -> Self {
152 Self::production_defaults(name, addr)
153 }
154
155 #[cfg(feature = "observability")]
157 pub fn with_metrics(mut self, metrics: crate::observability::MetricsRegistry) -> Self {
158 self.metrics = Some(metrics);
159 self
160 }
161
162 pub fn into_layer(self) -> crate::rpc::RpcUnaryResilienceLayer {
164 #[cfg(feature = "observability")]
165 {
166 let mut resilience = crate::rpc::RpcResilienceLayer::new(
167 self.config.name.clone(),
168 self.config.resilience.clone(),
169 );
170 if let Some(metrics) = self.metrics {
171 resilience = resilience.with_metrics(metrics);
172 }
173 crate::rpc::RpcUnaryResilienceLayer::new(resilience)
174 }
175
176 #[cfg(not(feature = "observability"))]
177 {
178 crate::rpc::RpcUnaryResilienceLayer::new(crate::rpc::RpcResilienceLayer::new(
179 self.config.name.clone(),
180 self.config.resilience.clone(),
181 ))
182 }
183 }
184}