1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
use std::{
io::{Error, ErrorKind, Result},
net::SocketAddr,
};
use crate::{
proto::{
grpcutil,
pb::{
self,
vm::vm_server::{Vm, VmServer},
},
PROTOCOL_VERSION,
},
subnet::rpc::utils,
};
use jsonrpc_core::futures::FutureExt;
use tokio::sync::broadcast::Receiver;
use tonic::transport::server::NamedService;
use tonic_health::server::health_reporter;
struct HandshakeConfig {
protocol_version: &'static str,
}
impl HandshakeConfig {
pub fn new() -> Self {
Self {
protocol_version: PROTOCOL_VERSION,
}
}
}
struct Plugin;
impl NamedService for Plugin {
const NAME: &'static str = "plugin";
}
pub async fn serve<V>(vm: V, stop_ch: Receiver<()>) -> Result<()>
where
V: Vm,
{
let addr = utils::new_socket_addr();
serve_with_address(vm, addr, stop_ch).await
}
pub async fn serve_with_address<V>(vm: V, addr: SocketAddr, mut stop_ch: Receiver<()>) -> Result<()>
where
V: Vm,
{
let (mut health_reporter, health_svc) = health_reporter();
health_reporter.set_serving::<Plugin>().await;
let reflection_service = tonic_reflection::server::Builder::configure()
.register_encoded_file_descriptor_set(pb::rpcdb::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(pb::vm::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(pb::google::protobuf::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(pb::io::prometheus::client::FILE_DESCRIPTOR_SET)
.register_encoded_file_descriptor_set(
tonic_health::proto::GRPC_HEALTH_V1_FILE_DESCRIPTOR_SET,
)
.build()
.map_err(|e| {
Error::new(
ErrorKind::Other,
format!("failed to create gRPC reflection service: {:?}", e),
)
})?;
log::info!("plugin listening on address {:?}", addr);
let handshake_config = HandshakeConfig::new();
let handshake_msg = format!("1|{}|tcp|{}|grpc|", handshake_config.protocol_version, addr);
println!("{}", handshake_msg);
grpcutil::default_server()
.add_service(health_svc)
.add_service(reflection_service)
.add_service(VmServer::new(vm))
.serve_with_shutdown(addr, stop_ch.recv().map(|_| ()))
.await
.map_err(|e| Error::new(ErrorKind::Other, format!("grpc server failed: {:?}", e)))?;
log::info!("grpc server shutdown complete: {}", addr);
Ok(())
}