1pub mod error;
4mod grpc_broker;
5mod grpc_broker_service;
6mod grpc_controller;
7mod grpc_stdio;
8mod unique_port;
9pub mod unix;
10
11use error::Error;
12
13use anyhow::{anyhow, Context, Result};
14use http::{Request, Response};
15use hyper::Body;
16use std::clone::Clone;
17use std::env;
18use std::marker::Send;
19use tonic::body::BoxBody;
20use tonic::transport::NamedService;
21use tower::Service;
22use unix::TempSocket;
23
24pub use grpc_broker::GRpcBroker;
25pub use grpc_broker_service::grpc_plugins::ConnInfo;
26pub use tonic::{Status, Streaming};
27
28use tokio::sync::mpsc::{unbounded_channel, UnboundedReceiver, UnboundedSender};
29
30pub type ServiceId = u32;
31
32const GRPC_CORE_PROTOCOL_VERSION: usize = 1;
35
36pub struct HandshakeConfig {
37 pub magic_cookie_key: String,
38 pub magic_cookie_value: String,
39}
40
41pub struct Server {
42 handshake_config: HandshakeConfig,
43 protocol_version: u32,
44 outgoing_conninfo_sender_receiver: UnboundedReceiver<UnboundedSender<Result<ConnInfo, Status>>>,
45 outgoing_conninfo_receiver_receiver:
46 UnboundedReceiver<UnboundedReceiver<Result<ConnInfo, Status>>>,
47 incoming_conninfo_stream_sender: UnboundedSender<Streaming<ConnInfo>>,
48 incoming_conninfo_stream_receiver_receiver:
49 UnboundedReceiver<UnboundedReceiver<Streaming<ConnInfo>>>,
50 trigger: triggered::Trigger,
51 listener: triggered::Listener,
52}
53
54impl Server {
55 pub fn new(protocol_version: u32, handshake_config: HandshakeConfig) -> Result<Server, Error> {
56 let (outgoing_conninfo_sender, outgoing_conninfo_receiver) =
60 unbounded_channel::<Result<ConnInfo, Status>>();
61
62 let (outgoing_conninfo_sender_transmitter, outgoing_conninfo_sender_receiver) =
66 unbounded_channel();
67 outgoing_conninfo_sender_transmitter.send(outgoing_conninfo_sender).context("Unable to send the outgoing_conninfo_sender to the transmitter. This is a tokio mpsc channel's transmitter being transmitted over another channel so it can be consumed exactly-one by someone later.")?;
68
69 let (outgoing_conninfo_receiver_transmitter, outgoing_conninfo_receiver_receiver) =
73 unbounded_channel();
74 outgoing_conninfo_receiver_transmitter.send(outgoing_conninfo_receiver).context("Unable to send the outgoing_conninfo_receiver to the transmitter. This is a tokio mpsc channel's receiver being transmitted over another channel so it can be consumed exactly-one by someone later.")?;
75
76 let (incoming_conninfo_stream_sender, incoming_conninfo_stream_receiver) =
81 unbounded_channel();
82
83 let (
86 incoming_conninfo_stream_receiver_transmitter,
87 incoming_conninfo_stream_receiver_receiver,
88 ) = unbounded_channel();
89
90 incoming_conninfo_stream_receiver_transmitter.send(incoming_conninfo_stream_receiver)
91 .context("Unable to send the incoming_conninfo_stream_receiver to the transmitter. This is a tokio mpsc channel's receiver's receiver being transmitted over another channel so it can be consumed exactly-one by someone later. They will eventually listen to this channel to then get the actual stream over which they'll receive incoming ConnInfo's.")?;
92
93 let (trigger, listener) = triggered::trigger();
94
95 Ok(Server {
96 handshake_config,
97 protocol_version,
98 outgoing_conninfo_sender_receiver,
99 outgoing_conninfo_receiver_receiver,
100 incoming_conninfo_stream_sender,
101 incoming_conninfo_stream_receiver_receiver,
102 trigger,
103 listener,
104 })
105 }
106
107 pub async fn grpc_broker(&mut self) -> Result<GRpcBroker, Error> {
108 let outgoing_conninfo_sender = match self.outgoing_conninfo_sender_receiver.recv().await {
109 None => {
110 let err = anyhow!("jsonrpc_server_broker's transmission channel was None, which, being initalized in the constructor, was vended off already. Was this method called twice? Did someone else .recv() it?");
111 log::error!("{}", err);
112 return Err(Error::Other(err));
113 }
114 Some(outgoing_conninfo_sender) => outgoing_conninfo_sender,
115 };
116
117 let incoming_conninfo_stream_receiver = match self
118 .incoming_conninfo_stream_receiver_receiver
119 .recv()
120 .await
121 {
122 None => {
123 let err = anyhow!("jsonrpc_server_broker's receiver for a future incoming stream of ConnInfo was None, which, being initalized in the constructor, was vended off already.");
124 log::error!("{}", err);
125 return Err(Error::Other(err));
126 }
127 Some(outgoing_conninfo_sender) => outgoing_conninfo_sender,
128 };
129
130 log::trace!("Creating the JSON RPC 2.0 Server Broker.",);
132 let jsonrpc_broker = GRpcBroker::new(
133 unique_port::UniquePort::new(),
134 outgoing_conninfo_sender,
135 incoming_conninfo_stream_receiver,
136 self.listener.clone(),
137 );
138
139 log::info!("Created JSON RPC 2.0 Server Broker.");
140
141 Ok(jsonrpc_broker)
142 }
143
144 fn validate_magic_cookie(&self) -> Result<(), Error> {
146 log::info!("Validating the magic environment cookies to conduct the handshake. Expecting environment variable {}={}.", self.handshake_config.magic_cookie_key, self.handshake_config.magic_cookie_value);
147 match env::var(&self.handshake_config.magic_cookie_key) {
148 Ok(value) => {
149 if value == self.handshake_config.magic_cookie_value {
150 log::info!("Handshake succeeded!");
151 return Ok(());
152 } else {
153 log::error!("Handshake failed due to environment variable {}'s value being {}, but expected to be {}.",self.handshake_config.magic_cookie_key, value, self.handshake_config.magic_cookie_value);
154 }
155 }
156 Err(e) => log::error!(
157 "Handshake failed due to error reading environment variable {}: {:?}",
158 self.handshake_config.magic_cookie_key,
159 e
160 ),
161 }
162
163 Err(Error::GRPCHandshakeMagicCookieValueMismatch)
164 }
165
166 pub async fn serve<S>(&mut self, plugin: S) -> Result<(), Error>
167 where
168 S: Service<Request<Body>, Response = Response<BoxBody>>
169 + NamedService
170 + Clone
171 + Send
172 + 'static,
173 <S as Service<http::Request<hyper::Body>>>::Future: Send + 'static,
174 <S as Service<http::Request<hyper::Body>>>::Error:
175 Into<Box<dyn std::error::Error + Send + Sync>> + Send,
176 {
177 log::trace!("serving over a Tcp Socket...");
178
179 self.validate_magic_cookie().context("Failed to validate magic cookie handshake from plugin client (i.e. host, i.e. consumer) to this Plugin.")?;
180
181 let (mut health_reporter, health_service) = tonic_health::server::health_reporter();
182 health_reporter.set_serving::<S>().await;
183 log::info!("gRPC Health Service created.");
184
185 let temp_socket = TempSocket::new()
186 .context("Failed to create a new TempSocket for opening the main gRPC listener to")?;
187 let socket_path = temp_socket.socket_filename()
188 .context("Failed to get a temporary socket filename from the temp socket for opening the main gRPC listener to")?;
189 log::trace!("Created new temp socket: {}", socket_path);
190
191 let handshakestr = format!(
192 "{}|{}|unix|{}|grpc|",
193 GRPC_CORE_PROTOCOL_VERSION, self.protocol_version, socket_path,
194 );
195 log::trace!("Created Handshake string: {}", handshakestr);
196
197 let incoming_stream_from_socket = unix::incoming_from_path(socket_path.as_str()).await?;
199 log::trace!("Created Incoming unix stream from the socket");
200
201 let outgoing_conninfo_receiver = match self.outgoing_conninfo_receiver_receiver.recv().await {
202 Some(outgoing_conninfo_receiver) => outgoing_conninfo_receiver,
203 None => return Err(Error::Other(anyhow!("Outgoing ConnInfo receiver does not exist. Did someone else .recv() it before? It was created in the constructor, so should be available in the method."))),
204 };
205
206 log::info!("Creating a GRPC Broker Server.");
207 let broker_server = grpc_broker_service::new_server(
209 outgoing_conninfo_receiver,
210 self.incoming_conninfo_stream_sender.clone(),
211 )
212 .await?;
213
214 log::info!("Creating a GRPC Controller Server.");
215 let controller_server = grpc_controller::new_server(self.trigger.clone());
216 log::info!("Creating a GRPC Stdio Server.");
217 let stdio_server = grpc_stdio::new_server();
218
219 let listener = self.listener.clone();
220 log::info!("Starting service...");
221 let grpc_service_future = tonic::transport::Server::builder()
222 .add_service(health_service)
223 .add_service(broker_server)
224 .add_service(controller_server)
225 .add_service(stdio_server)
226 .add_service(plugin)
227 .serve_with_incoming_shutdown(incoming_stream_from_socket, async { listener.await });
228
229 log::info!("About to print handshake string: {}", handshakestr);
230 println!("{}", handshakestr);
231
232 let result = grpc_service_future.await;
235
236 log::info!("gRPC broker service ended with result: {:?}", result);
237
238 Ok(())
239 }
240}