grr_plugin/
lib.rs

1// A go-plugin Server to write Rust-based plugins to Golang.
2
3pub mod error;
4mod grpc_broker;
5mod grpc_broker_service;
6mod grpc_controller;
7mod grpc_stdio;
8mod unique_port;
9pub mod unix;
10
11use error::Error;
12
13use anyhow::{anyhow, Context, Result};
14use http::{Request, Response};
15use hyper::Body;
16use std::clone::Clone;
17use std::env;
18use std::marker::Send;
19use tonic::body::BoxBody;
20use tonic::transport::NamedService;
21use tower::Service;
22use unix::TempSocket;
23
24pub use grpc_broker::GRpcBroker;
25pub use grpc_broker_service::grpc_plugins::ConnInfo;
26pub use tonic::{Status, Streaming};
27
28use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
29
30pub type ServiceId = u32;
31
32// The constants are for generating the go-plugin string
33// https://github.com/hashicorp/go-plugin/blob/master/docs/guide-plugin-write-non-go.md
34const GRPC_CORE_PROTOCOL_VERSION: usize = 1;
35
36pub struct HandshakeConfig {
37    pub magic_cookie_key: String,
38    pub magic_cookie_value: String,
39}
40
41pub struct Server {
42    handshake_config: HandshakeConfig,
43    protocol_version: u32,
44    outgoing_conninfo_sender_receiver: UnboundedReceiver<UnboundedSender<Result<ConnInfo, Status>>>,
45    outgoing_conninfo_receiver_receiver:
46        UnboundedReceiver<UnboundedReceiver<Result<ConnInfo, Status>>>,
47    incoming_conninfo_stream_sender: UnboundedSender<Streaming<ConnInfo>>,
48    incoming_conninfo_stream_receiver_receiver:
49        UnboundedReceiver<UnboundedReceiver<Streaming<ConnInfo>>>,
50    trigger: triggered::Trigger,
51    listener: triggered::Listener,
52}
53
54impl Server {
55    pub fn new(protocol_version: u32, handshake_config: HandshakeConfig) -> Result<Server, Error> {
56        // This channel sends conninfo from the plugin/server side (the sender will be vended to the JsonRPCBroker who will send new
57        // ConnInfo's as new services/handlers are launched) to the host/client side (through the gRPCBroker's start_stream call)
58        // where the host/client will process them
59        let (outgoing_conninfo_sender, outgoing_conninfo_receiver) =
60            unbounded_channel::<Result<ConnInfo, Status>>();
61
62        // Use this channel to send the channel transmitter above from this constructor
63        // to where it will be consumed in the "jsonrpc_broker" method later...
64        // using channels avoids having to do a complex sync dance using mutable globals.
65        let (outgoing_conninfo_sender_transmitter, outgoing_conninfo_sender_receiver) =
66            unbounded_channel();
67        outgoing_conninfo_sender_transmitter.send(outgoing_conninfo_sender).context("Unable to send the outgoing_conninfo_sender to the transmitter. This is a tokio mpsc channel's transmitter being transmitted over another channel so it can be consumed exactly-one by someone later.")?;
68
69        // Use this channel to send the channel receiver above from this constructor
70        // to where it will be consumed in the "serve" method later...
71        // using channels avoids having to do a complex sync dance using mutable globals.
72        let (outgoing_conninfo_receiver_transmitter, outgoing_conninfo_receiver_receiver) =
73            unbounded_channel();
74        outgoing_conninfo_receiver_transmitter.send(outgoing_conninfo_receiver).context("Unable to send the outgoing_conninfo_receiver to the transmitter. This is a tokio mpsc channel's receiver being transmitted over another channel so it can be consumed exactly-one by someone later.")?;
75
76        // Use this channel to send the channel from where we will receive ConnInfo's coming inbound
77        // from the host, to the broker which will know what to do with them
78        // This channel/stream of ConnInfo's will be received from the GRPCBroker in the start_stream call
79        // and will be sent from there to the JsonRPCBroker who will broker the ConnInfo's towards the host.
80        let (incoming_conninfo_stream_sender, incoming_conninfo_stream_receiver) =
81            unbounded_channel();
82
83        // Do the same dance of channel-of-channels to send the receiver since the underlying stream won't be available
84        // for quite some time.
85        let (
86            incoming_conninfo_stream_receiver_transmitter,
87            incoming_conninfo_stream_receiver_receiver,
88        ) = unbounded_channel();
89
90        incoming_conninfo_stream_receiver_transmitter.send(incoming_conninfo_stream_receiver)
91            .context("Unable to send the incoming_conninfo_stream_receiver to the transmitter. This is a tokio mpsc channel's receiver's receiver being transmitted over another channel so it can be consumed exactly-one by someone later. They will eventually listen to this channel to then get the actual stream over which they'll receive incoming ConnInfo's.")?;
92
93        let (trigger, listener) = triggered::trigger();
94
95        Ok(Server {
96            handshake_config,
97            protocol_version,
98            outgoing_conninfo_sender_receiver,
99            outgoing_conninfo_receiver_receiver,
100            incoming_conninfo_stream_sender,
101            incoming_conninfo_stream_receiver_receiver,
102            trigger,
103            listener,
104        })
105    }
106
107    pub async fn grpc_broker(&mut self) -> Result<GRpcBroker, Error> {
108        let outgoing_conninfo_sender = match self.outgoing_conninfo_sender_receiver.recv().await {
109            None => {
110                let err = anyhow!("jsonrpc_server_broker's transmission channel was None, which, being initalized in the constructor, was vended off already. Was this method called twice? Did someone else .recv() it?");
111                log::error!("{}", err);
112                return Err(Error::Other(err));
113            }
114            Some(outgoing_conninfo_sender) => outgoing_conninfo_sender,
115        };
116
117        let incoming_conninfo_stream_receiver = match self
118            .incoming_conninfo_stream_receiver_receiver
119            .recv()
120            .await
121        {
122            None => {
123                let err = anyhow!("jsonrpc_server_broker's receiver for a future incoming stream of ConnInfo was None, which, being initalized in the constructor, was vended off already.");
124                log::error!("{}", err);
125                return Err(Error::Other(err));
126            }
127            Some(outgoing_conninfo_sender) => outgoing_conninfo_sender,
128        };
129
130        // create the JSON-RPC 2.0 server broker
131        log::trace!("Creating the JSON RPC 2.0 Server Broker.",);
132        let jsonrpc_broker = GRpcBroker::new(
133            unique_port::UniquePort::new(),
134            outgoing_conninfo_sender,
135            incoming_conninfo_stream_receiver,
136            self.listener.clone(),
137        );
138
139        log::info!("Created JSON RPC 2.0 Server Broker.");
140
141        Ok(jsonrpc_broker)
142    }
143
144    // Copied from: https://github.com/hashicorp/go-plugin/blob/master/server.go#L247
145    fn validate_magic_cookie(&self) -> Result<(), Error> {
146        log::info!("Validating the magic environment cookies to conduct the handshake. Expecting environment variable {}={}.", self.handshake_config.magic_cookie_key, self.handshake_config.magic_cookie_value);
147        match env::var(&self.handshake_config.magic_cookie_key) {
148            Ok(value) => {
149                if value == self.handshake_config.magic_cookie_value {
150                    log::info!("Handshake succeeded!");
151                    return Ok(());
152                } else {
153                    log::error!("Handshake failed due to environment variable {}'s value being {}, but expected to be {}.",self.handshake_config.magic_cookie_key, value, self.handshake_config.magic_cookie_value);
154                }
155            }
156            Err(e) => log::error!(
157                "Handshake failed due to error reading environment variable {}: {:?}",
158                self.handshake_config.magic_cookie_key,
159                e
160            ),
161        }
162
163        Err(Error::GRPCHandshakeMagicCookieValueMismatch)
164    }
165
166    pub async fn serve<S>(&mut self, plugin: S) -> Result<(), Error>
167    where
168        S: Service<Request<Body>, Response = Response<BoxBody>>
169            + NamedService
170            + Clone
171            + Send
172            + 'static,
173        <S as Service<http::Request<hyper::Body>>>::Future: Send + 'static,
174        <S as Service<http::Request<hyper::Body>>>::Error:
175            Into<Box<dyn std::error::Error + Send + Sync>> + Send,
176    {
177        log::trace!("serving over a Tcp Socket...");
178
179        self.validate_magic_cookie().context("Failed to validate magic cookie handshake from plugin client (i.e. host, i.e. consumer) to this Plugin.")?;
180
181        let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
182        health_reporter.set_serving::<S>().await;
183        log::info!("gRPC Health Service created.");
184
185        let temp_socket = TempSocket::new()
186            .context("Failed to create a new TempSocket for opening the main gRPC listener to")?;
187        let socket_path = temp_socket.socket_filename()
188            .context("Failed to get a temporary socket filename from the temp socket for opening the main gRPC listener to")?;
189        log::trace!("Created new temp socket: {}", socket_path);
190
191        let handshakestr = format!(
192            "{}|{}|unix|{}|grpc|",
193            GRPC_CORE_PROTOCOL_VERSION, self.protocol_version, socket_path,
194        );
195        log::trace!("Created Handshake string: {}", handshakestr);
196
197        // create incoming stream from unix socket above...
198        let incoming_stream_from_socket = unix::incoming_from_path(socket_path.as_str()).await?;
199        log::trace!("Created Incoming unix stream from the socket");
200
201        let outgoing_conninfo_receiver = match self.outgoing_conninfo_receiver_receiver.recv().await {
202            Some(outgoing_conninfo_receiver) => outgoing_conninfo_receiver,
203            None => return Err(Error::Other(anyhow!("Outgoing ConnInfo receiver does not exist. Did someone else .recv() it before? It was created in the constructor, so should be available in the method."))),
204        };
205
206        log::info!("Creating a GRPC Broker Server.");
207        // mspc Senders can be cloned. Receivers need all the attention and queueing.
208        let broker_server = grpc_broker_service::new_server(
209            outgoing_conninfo_receiver,
210            self.incoming_conninfo_stream_sender.clone(),
211        )
212        .await?;
213
214        log::info!("Creating a GRPC Controller Server.");
215        let controller_server = grpc_controller::new_server(self.trigger.clone());
216        log::info!("Creating a GRPC Stdio Server.");
217        let stdio_server = grpc_stdio::new_server();
218
219        let listener = self.listener.clone();
220        log::info!("Starting service...");
221        let grpc_service_future = tonic::transport::Server::builder()
222            .add_service(health_service)
223            .add_service(broker_server)
224            .add_service(controller_server)
225            .add_service(stdio_server)
226            .add_service(plugin)
227            .serve_with_incoming_shutdown(incoming_stream_from_socket, async { listener.await });
228
229        log::info!("About to print handshake string: {}", handshakestr);
230        println!("{}", handshakestr);
231
232        // starting broker and plugin services now...
233        //join!(broker_service_future, plugin_service_future);
234        let result = grpc_service_future.await;
235
236        log::info!("gRPC broker service ended with result: {:?}", result);
237
238        Ok(())
239    }
240}