avalanche_types/subnet/rpc/utils/
grpc.rs

1use crate::proto::google::protobuf::Timestamp;
2
3use std::{
4    convert::Infallible,
5    io::{Error, ErrorKind, Result},
6    net::SocketAddr,
7    time::Duration,
8};
9
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use http::{Request, Response};
13use hyper::Body;
14use tokio::sync::broadcast::Receiver;
15use tonic::{
16    body::BoxBody,
17    server::NamedService,
18    transport::{Channel, Endpoint},
19};
20use tower_service::Service;
21
22/// gRPC Defaults
23///
24/// Sets the [`SETTINGS_MAX_CONCURRENT_STREAMS`][spec] option for HTTP2
25/// connections.
26///
27/// Tonic default is no limit (`None`) which is the same as u32::MAX.
28///
29/// [spec]: https://http2.github.io/http2-spec/#SETTINGS_MAX_CONCURRENT_STREAMS
30pub const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = u32::MAX;
31
32/// Sets a timeout for receiving an acknowledgement of the keepalive ping.
33///
34/// If the ping is not acknowledged within the timeout, the connection will be closed.
35/// Does nothing if http2_keep_alive_interval is disabled.
36///
37/// Tonic default is 20 seconds.
38pub const DEFAULT_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(20);
39
40/// Set whether HTTP2 Ping frames are enabled on accepted connections.
41///
42/// If `None` is specified, HTTP2 keepalive is disabled, otherwise the duration
43/// specified will be the time interval between HTTP2 Ping frames.
44/// The timeout for receiving an acknowledgement of the keepalive ping
45/// can be set with \[`Server::http2_keepalive_timeout`\].
46///
47/// Tonic default is no HTTP2 keepalive (`None`)
48/// Avalanche default is 2 hours.
49pub const DEFAULT_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2 * 60 * 60);
50
51/// Set whether TCP keepalive messages are enabled on accepted connections.
52///
53/// If `None` is specified, keepalive is disabled, otherwise the duration
54/// specified will be the time to remain idle before sending TCP keepalive
55/// probes.
56///
57/// Default is no keepalive (`None`)
58/// Avalanche default is 5 seconds.
59pub const DEFAULT_KEEP_ALIVE_MIN_TIME: Duration = Duration::from_secs(5);
60
61/// Creates a tonic gRPC server with avalanche defaults.
62pub fn default_server() -> tonic::transport::Server {
63    tonic::transport::Server::builder()
64        .max_concurrent_streams(DEFAULT_MAX_CONCURRENT_STREAMS)
65        .http2_keepalive_timeout(Some(DEFAULT_KEEP_ALIVE_TIMEOUT))
66        .http2_keepalive_interval(Some(DEFAULT_KEEP_ALIVE_INTERVAL))
67        .tcp_keepalive(Some(DEFAULT_KEEP_ALIVE_MIN_TIME))
68}
69
70/// Creates a tonic Endpoint with avalanche defaults. The endpoint input is
71/// expected in `<ip>:<port>` format.
72pub fn default_client(endpoint: &str) -> Result<Endpoint> {
73    let endpoint = Channel::from_shared(format!("http://{endpoint}"))
74        .map_err(|e| Error::new(ErrorKind::Other, format!("invalid endpoint: {e}")))?
75        .keep_alive_timeout(DEFAULT_KEEP_ALIVE_TIMEOUT)
76        .http2_keep_alive_interval(DEFAULT_KEEP_ALIVE_INTERVAL)
77        .tcp_keepalive(Some(DEFAULT_KEEP_ALIVE_MIN_TIME));
78
79    Ok(endpoint)
80}
81
82/// Server is a gRPC server lifecycle manager.
83pub struct Server {
84    /// Waits for the broadcasted stop signal to shutdown gRPC server.
85    pub stop_ch: Receiver<()>,
86
87    /// Server address.
88    pub addr: SocketAddr,
89}
90
91impl Server {
92    pub fn new(addr: SocketAddr, stop_ch: Receiver<()>) -> Self {
93        Self { stop_ch, addr }
94    }
95}
96
97// TODO: add support for multiple services.
98impl Server {
99    /// Attempts to start a gRPC server for the provided service which can be
100    /// shutdown by a broadcast channel.
101    pub fn serve<S>(mut self, svc: S) -> Result<()>
102    where
103        S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
104            + NamedService
105            + Clone
106            + Send
107            + 'static,
108        S::Future: Send + 'static,
109    {
110        tokio::spawn(async move {
111            default_server()
112                .add_service(svc)
113                .serve_with_shutdown(self.addr, self.stop_ch.recv().map(|_| ()))
114                .await
115                .map_err(|e| Error::new(ErrorKind::Other, format!("grpc server failed: {:?}", e)))
116        });
117        log::info!("gRPC server started: {}", self.addr);
118
119        Ok(())
120    }
121}
122
123/// Converts DataTime to a google::protobuf::Timestamp
124pub fn timestamp_from_time(dt: &DateTime<Utc>) -> Timestamp {
125    Timestamp {
126        seconds: dt.timestamp(),
127        nanos: dt.timestamp_subsec_nanos() as i32,
128    }
129}