1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
// A go-plugin Server to write Rust-based plugins to Golang.

mod error;

use error::Error;
use http::{Request, Response};
use hyper::Body;
use std::clone::Clone;
use std::env;
use std::marker::Send;
use tonic::body::BoxBody;
use tonic::transport::NamedService;
use tower::Service;

pub trait PluginService:
    Service<Request<Body>, Response = Response<BoxBody>> + NamedService + Clone + Send + 'static
{
}

// The constants are for generating the go-plugin string
// https://github.com/hashicorp/go-plugin/blob/master/docs/guide-plugin-write-non-go.md
const GRPC_CORE_PROTOCOL_VERSION: usize = 1;

/// Golang/go-plugin don't support IPV6 yet. Yes, yes, I know...
// bind to ALL addresses on Localhost
const LOCALHOST_BIND_ADDR: &str = "0.0.0.0";

// How should other processes on the localhost address localhost?
const LOCALHOST_ADVERTISE_ADDR: &str = "127.0.0.1";

const LOG_PREFIX: &str = "GrrPlugin::Server: ";

pub struct HandshakeConfig {
    pub magic_cookie_key: String,
    pub magic_cookie_value: String,
}

pub struct Server {
    handshake_config: HandshakeConfig,
    protocol_version: u32,
}

impl Server {
    pub fn new(protocol_version: u32, handshake_config: HandshakeConfig) -> Server {
        Server {
            handshake_config,
            protocol_version,
        }
    }

    // Copied from: https://github.com/hashicorp/go-plugin/blob/master/server.go#L247
    fn validate_magic_cookie(&self) -> Result<(), Error> {
        log::info!("{}Validating the magic environment cookies to conduct the handshake. Expecting environment variable {}={}.",LOG_PREFIX, self.handshake_config.magic_cookie_key, self.handshake_config.magic_cookie_value);
        match env::var(&self.handshake_config.magic_cookie_key) {
            Ok(value) => {
                if value == self.handshake_config.magic_cookie_value {
                    log::info!("{}Handshake succeeded!", LOG_PREFIX);
                    return Ok(());
                } else {
                    log::error!("{}Handshake failed due to environment variable {}'s value being {}, but expected to be {}.", LOG_PREFIX,self.handshake_config.magic_cookie_key, value, self.handshake_config.magic_cookie_value);
                }
            }
            Err(e) => log::error!(
                "{}Handshake failed due to error reading environment variable {}: {:?}",
                LOG_PREFIX,
                self.handshake_config.magic_cookie_key,
                e
            ),
        }

        Err(Error::GRPCHandshakeMagicCookieValueMismatch)
    }

    pub async fn serve<S: PluginService>(&self, service: S) -> Result<(), Error>
    where
        <S as Service<http::Request<hyper::Body>>>::Future: Send + 'static,
        <S as Service<http::Request<hyper::Body>>>::Error:
            Into<Box<dyn std::error::Error + Send + Sync>> + Send,
    {
        log::info!("{}Serving over a Tcp Socket...", LOG_PREFIX);

        log_and_escalate!(self.validate_magic_cookie());

        let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
        health_reporter.set_serving::<S>().await;
        log::info!("{}gRPC Health Service created.", LOG_PREFIX);

        let port = match portpicker::pick_unused_port() {
            Some(p) => p,
            None => {
                return Err(Error::Generic(
                    "Unable to find a free unused TCP port to bind the gRPC server to".to_string(),
                ));
            }
        };

        log::info!("{}Picked port: {}", LOG_PREFIX, port);

        let addrstr = format!("{}:{}", LOCALHOST_BIND_ADDR, port);
        let addr = log_and_escalate!(addrstr.parse());

        let handshakestr = format!(
            "{}|{}|tcp|{}:{}|grpc|",
            GRPC_CORE_PROTOCOL_VERSION, self.protocol_version, LOCALHOST_ADVERTISE_ADDR, port
        );

        log::info!(
            "{}About to print Handshake string: {}",
            LOG_PREFIX,
            handshakestr
        );
        println!("{}", handshakestr);

        log::info!("About to begin serving....");
        log_and_escalate!(
            tonic::transport::Server::builder()
                .add_service(health_service)
                .add_service(service)
                .serve(addr)
                .await
        );

        log::info!("{}Serving ended! Plugin about to shut down.", LOG_PREFIX,);

        Ok(())
    }
}