avalanche_types/subnet/rpc/utils/
grpc.rs1use crate::proto::google::protobuf::Timestamp;
2
3use std::{
4 convert::Infallible,
5 io::{Error, ErrorKind, Result},
6 net::SocketAddr,
7 time::Duration,
8};
9
10use chrono::{DateTime, Utc};
11use futures::FutureExt;
12use http::{Request, Response};
13use hyper::Body;
14use tokio::sync::broadcast::Receiver;
15use tonic::{
16 body::BoxBody,
17 server::NamedService,
18 transport::{Channel, Endpoint},
19};
20use tower_service::Service;
21
22pub const DEFAULT_MAX_CONCURRENT_STREAMS: u32 = u32::MAX;
31
32pub const DEFAULT_KEEP_ALIVE_TIMEOUT: Duration = Duration::from_secs(20);
39
40pub const DEFAULT_KEEP_ALIVE_INTERVAL: Duration = Duration::from_secs(2 * 60 * 60);
50
51pub const DEFAULT_KEEP_ALIVE_MIN_TIME: Duration = Duration::from_secs(5);
60
61pub fn default_server() -> tonic::transport::Server {
63 tonic::transport::Server::builder()
64 .max_concurrent_streams(DEFAULT_MAX_CONCURRENT_STREAMS)
65 .http2_keepalive_timeout(Some(DEFAULT_KEEP_ALIVE_TIMEOUT))
66 .http2_keepalive_interval(Some(DEFAULT_KEEP_ALIVE_INTERVAL))
67 .tcp_keepalive(Some(DEFAULT_KEEP_ALIVE_MIN_TIME))
68}
69
70pub fn default_client(endpoint: &str) -> Result<Endpoint> {
73 let endpoint = Channel::from_shared(format!("http://{endpoint}"))
74 .map_err(|e| Error::new(ErrorKind::Other, format!("invalid endpoint: {e}")))?
75 .keep_alive_timeout(DEFAULT_KEEP_ALIVE_TIMEOUT)
76 .http2_keep_alive_interval(DEFAULT_KEEP_ALIVE_INTERVAL)
77 .tcp_keepalive(Some(DEFAULT_KEEP_ALIVE_MIN_TIME));
78
79 Ok(endpoint)
80}
81
82pub struct Server {
84 pub stop_ch: Receiver<()>,
86
87 pub addr: SocketAddr,
89}
90
91impl Server {
92 pub fn new(addr: SocketAddr, stop_ch: Receiver<()>) -> Self {
93 Self { stop_ch, addr }
94 }
95}
96
97impl Server {
99 pub fn serve<S>(mut self, svc: S) -> Result<()>
102 where
103 S: Service<Request<Body>, Response = Response<BoxBody>, Error = Infallible>
104 + NamedService
105 + Clone
106 + Send
107 + 'static,
108 S::Future: Send + 'static,
109 {
110 tokio::spawn(async move {
111 default_server()
112 .add_service(svc)
113 .serve_with_shutdown(self.addr, self.stop_ch.recv().map(|_| ()))
114 .await
115 .map_err(|e| Error::new(ErrorKind::Other, format!("grpc server failed: {:?}", e)))
116 });
117 log::info!("gRPC server started: {}", self.addr);
118
119 Ok(())
120 }
121}
122
123pub fn timestamp_from_time(dt: &DateTime<Utc>) -> Timestamp {
125 Timestamp {
126 seconds: dt.timestamp(),
127 nanos: dt.timestamp_subsec_nanos() as i32,
128 }
129}