1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
mod error;
use error::Error;
use http::{Request, Response};
use hyper::Body;
use std::clone::Clone;
use std::env;
use std::marker::Send;
use tonic::body::BoxBody;
use tonic::transport::NamedService;
use tower::Service;
pub trait PluginService:
Service<Request<Body>, Response = Response<BoxBody>> + NamedService + Clone + Send + 'static
{
}
const GRPC_CORE_PROTOCOL_VERSION: usize = 1;
const LOCALHOST_BIND_ADDR: &str = "0.0.0.0";
const LOCALHOST_ADVERTISE_ADDR: &str = "127.0.0.1";
const LOG_PREFIX: &str = "GrrPlugin::Server: ";
pub struct HandshakeConfig {
pub magic_cookie_key: String,
pub magic_cookie_value: String,
}
pub struct Server {
handshake_config: HandshakeConfig,
protocol_version: u32,
}
impl Server {
pub fn new(protocol_version: u32, handshake_config: HandshakeConfig) -> Server {
Server {
handshake_config,
protocol_version,
}
}
fn validate_magic_cookie(&self) -> Result<(), Error> {
log::info!("{}Validating the magic environment cookies to conduct the handshake. Expecting environment variable {}={}.",LOG_PREFIX, self.handshake_config.magic_cookie_key, self.handshake_config.magic_cookie_value);
match env::var(&self.handshake_config.magic_cookie_key) {
Ok(value) => {
if value == self.handshake_config.magic_cookie_value {
log::info!("{}Handshake succeeded!", LOG_PREFIX);
return Ok(());
} else {
log::error!("{}Handshake failed due to environment variable {}'s value being {}, but expected to be {}.", LOG_PREFIX,self.handshake_config.magic_cookie_key, value, self.handshake_config.magic_cookie_value);
}
}
Err(e) => log::error!(
"{}Handshake failed due to error reading environment variable {}: {:?}",
LOG_PREFIX,
self.handshake_config.magic_cookie_key,
e
),
}
Err(Error::GRPCHandshakeMagicCookieValueMismatch)
}
pub async fn serve<S: PluginService>(&self, service: S) -> Result<(), Error>
where
<S as Service<http::Request<hyper::Body>>>::Future: Send + 'static,
<S as Service<http::Request<hyper::Body>>>::Error:
Into<Box<dyn std::error::Error + Send + Sync>> + Send,
{
log::info!("{}Serving over a Tcp Socket...", LOG_PREFIX);
log_and_escalate!(self.validate_magic_cookie());
let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
health_reporter.set_serving::<S>().await;
log::info!("{}gRPC Health Service created.", LOG_PREFIX);
let port = match portpicker::pick_unused_port() {
Some(p) => p,
None => {
return Err(Error::Generic(
"Unable to find a free unused TCP port to bind the gRPC server to".to_string(),
));
}
};
log::info!("{}Picked port: {}", LOG_PREFIX, port);
let addrstr = format!("{}:{}", LOCALHOST_BIND_ADDR, port);
let addr = log_and_escalate!(addrstr.parse());
let handshakestr = format!(
"{}|{}|tcp|{}:{}|grpc|",
GRPC_CORE_PROTOCOL_VERSION, self.protocol_version, LOCALHOST_ADVERTISE_ADDR, port
);
log::info!(
"{}About to print Handshake string: {}",
LOG_PREFIX,
handshakestr
);
println!("{}", handshakestr);
log::info!("About to begin serving....");
log_and_escalate!(
tonic::transport::Server::builder()
.add_service(health_service)
.add_service(service)
.serve(addr)
.await
);
log::info!("{}Serving ended! Plugin about to shut down.", LOG_PREFIX,);
Ok(())
}
}