Skip to main content

rs_zero/rpc/
server.rs

1use std::{convert::Infallible, future::Future, net::SocketAddr};
2
3use tonic::server::NamedService;
4
5use crate::core::{CoreError, Service, ServiceFuture, ShutdownToken};
6
7/// Starts a tonic health server with graceful shutdown.
8///
9/// This helper gives the MVP a concrete, generated tonic service without
10/// requiring application projects to define protobuf files immediately.
11pub async fn serve_health_with_shutdown<F>(
12    addr: SocketAddr,
13    shutdown: F,
14) -> Result<(), tonic::transport::Error>
15where
16    F: Future<Output = ()> + Send + 'static,
17{
18    let (_reporter, health_service) = tonic_health::server::health_reporter();
19
20    tonic::transport::Server::builder()
21        .add_service(health_service)
22        .serve_with_shutdown(addr, shutdown)
23        .await
24}
25
26/// [`Service`] adapter for running one tonic service inside a service group.
27pub struct TonicService<S> {
28    name: String,
29    addr: SocketAddr,
30    service: std::sync::Mutex<Option<S>>,
31}
32
33impl<S> TonicService<S> {
34    /// Creates a tonic service-group adapter.
35    pub fn new(name: impl Into<String>, addr: SocketAddr, service: S) -> Self {
36        Self {
37            name: name.into(),
38            addr,
39            service: std::sync::Mutex::new(Some(service)),
40        }
41    }
42
43    /// Returns the configured listen address.
44    pub fn addr(&self) -> SocketAddr {
45        self.addr
46    }
47}
48
49impl<S> Service for TonicService<S>
50where
51    S: tower::Service<
52            http::Request<tonic::body::Body>,
53            Response = http::Response<tonic::body::Body>,
54            Error = Infallible,
55        > + NamedService
56        + Clone
57        + Send
58        + Sync
59        + 'static,
60    S::Future: Send + 'static,
61{
62    fn name(&self) -> &str {
63        &self.name
64    }
65
66    fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
67        Box::pin(async move {
68            let service = self
69                .service
70                .lock()
71                .expect("tonic service mutex")
72                .take()
73                .ok_or_else(|| {
74                    CoreError::Service(format!("service {} already started", self.name))
75                })?;
76            tonic::transport::Server::builder()
77                .add_service(service)
78                .serve_with_shutdown(self.addr, async move {
79                    shutdown.cancelled().await;
80                })
81                .await
82                .map_err(|error| {
83                    CoreError::Service(format!("RPC service {} failed: {error}", self.name))
84                })
85        })
86    }
87}
88
89/// Health-check RPC service adapter for service groups.
90pub struct TonicHealthService {
91    name: String,
92    addr: SocketAddr,
93}
94
95impl TonicHealthService {
96    /// Creates a tonic health service adapter.
97    pub fn new(name: impl Into<String>, addr: SocketAddr) -> Self {
98        Self {
99            name: name.into(),
100            addr,
101        }
102    }
103}
104
105impl Service for TonicHealthService {
106    fn name(&self) -> &str {
107        &self.name
108    }
109
110    fn start(&self, shutdown: ShutdownToken) -> ServiceFuture<'_> {
111        Box::pin(async move {
112            serve_health_with_shutdown(self.addr, async move {
113                shutdown.cancelled().await;
114            })
115            .await
116            .map_err(|error| {
117                CoreError::Service(format!("RPC service {} failed: {error}", self.name))
118            })
119        })
120    }
121}