1use std::{
2 collections::HashMap,
3 ffi::{OsStr, OsString},
4 future::Future,
5 marker::PhantomData,
6 path::{Path, PathBuf},
7 process::Stdio,
8 str::FromStr,
9 sync::Arc,
10 thread,
11};
12
13use ipc_channel::ipc::{IpcOneShotServer, IpcSender};
14use tokio::{
15 process::{Child, Command},
16 sync::{mpsc, watch},
17 time::{Duration, Instant},
18};
19use uuid::Uuid;
20
21use crate::{
22 get_log_prefix, ConnectionStatus, IpcReceiveStream, IpcReplyFuture, IpcRpcError,
23 PendingReplyEntry, SchemaValidationStatus, UserMessage,
24};
25
26#[cfg(feature = "message-schema-validation")]
27use schemars::{schema_for, Schema};
28
29use super::{ConnectionKey, InternalMessage, InternalMessageKind};
30
31async fn process_outgoing_server_mail<U: UserMessage>(
35 mut internal_receiver: mpsc::UnboundedReceiver<InternalMessage<U>>,
36 ipc_sender: IpcSender<InternalMessage<U>>,
37 log_prefix: Arc<str>,
38) {
39 log::info!("{}Processing outgoing server mail!", log_prefix);
40 while let Some(message) = internal_receiver.recv().await {
41 if let Err(e) = ipc_sender.send(message) {
42 log::error!("{}Failed to send message to client: {:?}", log_prefix, e);
43 }
44 }
45 log::info!("{}Exiting outgoing server mail loop", log_prefix)
46}
47
48#[derive(Debug, Clone)]
50pub struct IpcRpcServer<U: UserMessage> {
51 sender: mpsc::UnboundedSender<InternalMessage<U>>,
52 status_receiver: watch::Receiver<ConnectionStatus>,
53 #[cfg(feature = "message-schema-validation")]
54 validation_receiver: watch::Receiver<Option<SchemaValidationStatus>>,
55 pending_reply_sender: mpsc::UnboundedSender<PendingReplyEntry<U>>,
56 log_prefix: Arc<str>,
57}
58
59#[derive(Debug)]
62pub struct IpcRpc<U: UserMessage> {
63 pub server: IpcRpcServer<U>,
64 client_process: Child,
65}
66
67impl<U: UserMessage> IpcRpcServer<U> {
68 pub async fn initialize_server<F, Fut>(
71 message_handler: F,
72 ) -> Result<(ConnectionKey, IpcRpcServer<U>), IpcRpcError>
73 where
74 F: Fn(U) -> Fut + Send + Sync + 'static,
75 Fut: Future<Output = Option<U>> + Send,
76 {
77 let (server, server_name) = IpcOneShotServer::<InternalMessage<U>>::new()?;
78 let (internal_sender, internal_receiver) = mpsc::unbounded_channel::<InternalMessage<U>>();
79 let runtime = tokio::runtime::Handle::current();
80 let (status_sender, status_receiver) = watch::channel(ConnectionStatus::WaitingForClient);
81 #[cfg(feature = "message-schema-validation")]
82 let (validation_sender, validation_receiver) = watch::channel(None);
83 let log_prefix = Arc::from(get_log_prefix(true));
84 let (pending_reply_sender, pending_reply_reciever) = mpsc::unbounded_channel();
85 let pending_reply_sender_clone = pending_reply_sender.clone();
86 log::info!("{}Server initialized!", log_prefix);
87 thread::spawn({
88 let log_prefix = Arc::clone(&log_prefix);
89 move || {
90 Self::startup(
91 server,
92 internal_receiver,
93 pending_reply_sender_clone,
94 pending_reply_reciever,
95 message_handler,
96 status_sender,
97 #[cfg(feature = "message-schema-validation")]
98 validation_sender,
99 log_prefix,
100 runtime,
101 )
102 }
103 });
104
105 Ok((
106 ConnectionKey::from_str(&server_name).expect("server_name is always uuid"),
107 IpcRpcServer {
108 sender: internal_sender,
109 pending_reply_sender,
110 status_receiver,
111 #[cfg(feature = "message-schema-validation")]
112 validation_receiver,
113 log_prefix,
114 },
115 ))
116 }
117
118 #[allow(clippy::too_many_arguments)]
123 fn startup<Fut: Future<Output = Option<U>> + Send, F: Fn(U) -> Fut + Send + Sync + 'static>(
124 server: IpcOneShotServer<InternalMessage<U>>,
125 internal_receiver: mpsc::UnboundedReceiver<InternalMessage<U>>,
126 #[allow(unused)] pending_reply_sender: mpsc::UnboundedSender<PendingReplyEntry<U>>,
127 pending_reply_receiver: mpsc::UnboundedReceiver<PendingReplyEntry<U>>,
128 message_handler: F,
129 status_sender: watch::Sender<ConnectionStatus>,
130 #[cfg(feature = "message-schema-validation")] validation_sender: watch::Sender<
131 Option<SchemaValidationStatus>,
132 >,
133 log_prefix: Arc<str>,
134 runtime: tokio::runtime::Handle,
135 ) {
136 let new_client = server.accept();
138 let _handle = runtime.enter();
140 match new_client {
141 Err(e) => {
142 log::error!("{}Error opening connection to client {:?}", log_prefix, e);
143 let e = IpcRpcError::from(e);
144 let _ = status_sender.send(ConnectionStatus::DisconnectError(e));
145 }
146 Ok((receive_from_client, first_message)) => {
147 if let InternalMessageKind::InitConnection(ipc_sender) = first_message.kind {
148 let _ = status_sender.send(ConnectionStatus::Connected);
149 log::info!("{}Connection established!", log_prefix);
150 #[cfg(feature = "message-schema-validation")]
151 {
152 let (sender, receiver) = mpsc::unbounded_channel();
153 let message = InternalMessage {
154 uuid: Uuid::new_v4(),
155 kind: InternalMessageKind::UserMessageSchema(
156 serde_json::to_string(&schema_for!(U))
157 .expect("upstream guarantees this won't fail"),
158 ),
159 };
160 if let Err(e) = pending_reply_sender.send((
161 message.uuid,
162 (sender, Instant::now() + crate::DEFAULT_REPLY_TIMEOUT),
163 )) {
164 log::error!("Failed to send entry for reply drop box {:?}", e);
165 }
166 match ipc_sender.send(message) {
167 Ok(()) => {
168 let reply_future = IpcReplyFuture { receiver };
169 tokio::spawn(async move {
170 match reply_future.await {
171 Ok(InternalMessageKind::UserMessageSchemaOk) => {
172 log::info!(
173 "Remote client validated user message schema"
174 );
175 if let Err(e) = validation_sender
176 .send(Some(SchemaValidationStatus::SchemasMatched))
177 {
178 log::error!(
179 "Failed to set validation_status {e:#?}"
180 );
181 }
182 }
183 Ok(InternalMessageKind::UserMessageSchemaError {
184 other_schema,
185 }) => {
186 let my_schema = schema_for!(U);
187 let res = validation_sender.send(Some(
188 SchemaValidationStatus::SchemaMismatch {
189 our_schema: serde_json::to_string(&my_schema)
190 .expect(
191 "upstream guarantees this won't fail",
192 ),
193 their_schema: other_schema.clone(),
194 },
195 ));
196 if let Err(e) = res {
197 log::error!(
198 "Failed to set validation_status {e:#?}"
199 );
200 }
201 match serde_json::from_str::<Schema>(&other_schema) {
202 Ok(other_schema) => {
203 if other_schema == my_schema {
204 log::error!("Client failed validation on user message schema, but the schemas match. This is probably a bug in ipc-rpc.");
205 } else {
206 log::error!("Failed to validate that user messages have the same schema. Messages may fail to serialize and deserialize correctly. This is a serious problem.\nServer Schema {my_schema:#?}\nClient Schema {other_schema:#?}")
207 }
208 }
209 Err(_) => {
210 log::error!("Server failed validation on user schema, and we failed to deserialize incoming schema properly, got {other_schema:?}");
211 }
212 }
213 }
214 Ok(m) => {
215 log::error!("Unexpected reply for user message schema validation {m:#?}");
216 if let Err(e) = validation_sender.send(Some(SchemaValidationStatus::ValidationNotPerformedProperly)) {
217 log::error!("Failed to set validation_status {e:#?}");
218 }
219 }
220 Err(IpcRpcError::ConnectionDropped) => {
221 }
223 Err(e) => {
224 log::error!("Failed to validate user message schema, messages may fail to serialize and deserialize correctly. Was the client compiled without the message-schema-validation feature? {e:#?}");
225 if let Err(e) = validation_sender.send(Some(SchemaValidationStatus::ValidationCommunicationFailed(e))) {
226 log::error!("Failed to set validation_status {e:#?}");
227 }
228 }
229 }
230 });
231 }
232 Err(e) => {
233 log::error!("Failed to send validation request to client {e:#?}");
234 }
235 }
236 }
237 thread::Builder::new()
241 .name("outgoing_mail".to_string())
242 .spawn({
243 let ipc_sender = ipc_sender.clone();
244 let runtime = runtime.clone();
245 move || {
246 runtime.block_on(process_outgoing_server_mail(
247 internal_receiver,
248 ipc_sender,
249 log_prefix,
250 ));
251 }
252 })
253 .unwrap();
254 runtime.spawn(async move {
255 crate::process_incoming_mail(
257 true,
258 pending_reply_receiver,
259 IpcReceiveStream::new(receive_from_client),
260 message_handler,
261 ipc_sender,
262 status_sender,
263 )
264 .await;
265 });
266 } else {
267 log::error!("{}First message received was not an InitConnection message. Dropping connection.", log_prefix);
268 let _ = status_sender.send(ConnectionStatus::DisconnectError(
269 IpcRpcError::HandshakeFailure,
270 ));
271 }
272 }
273 }
274 }
275
276 fn internal_send(
277 &self,
278 message_kind: InternalMessageKind<U>,
279 timeout: Duration,
280 ) -> impl Future<Output = Result<InternalMessageKind<U>, IpcRpcError>> + Send + 'static {
281 let message = InternalMessage {
282 uuid: Uuid::new_v4(),
283 kind: message_kind,
284 };
285 let (sender, receiver) = mpsc::unbounded_channel();
286 if let Err(e) = self
287 .pending_reply_sender
288 .send((message.uuid, (sender, Instant::now() + timeout)))
289 {
290 log::error!("Failed to send entry for reply drop box {:?}", e);
291 }
292 self.sender.send(message).unwrap();
293 IpcReplyFuture { receiver }
294 }
295
296 pub fn send(
298 &self,
299 user_message: U,
300 ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
301 self.send_timeout(user_message, crate::DEFAULT_REPLY_TIMEOUT)
302 }
303
304 pub fn send_timeout(
306 &self,
307 user_message: U,
308 timeout: Duration,
309 ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
310 let send_fut = self.internal_send(InternalMessageKind::UserMessage(user_message), timeout);
311 async move {
312 send_fut.await.map(|m| match m {
313 InternalMessageKind::UserMessage(m) => m,
314 _ => panic!(
315 "Got a non-user message reply to a user message. This is a bug in ipc-rpc."
316 ),
317 })
318 }
319 }
320
321 pub fn client_connected(&self) -> bool {
322 matches!(*self.status_receiver.borrow(), ConnectionStatus::Connected)
323 }
324
325 pub fn wait_for_client_to_connect(
326 &mut self,
327 ) -> impl Future<Output = Result<(), IpcRpcError>> + Send + 'static {
328 let mut status_receiver = self.status_receiver.clone();
329 async move {
330 loop {
331 {
334 let borrow = status_receiver.borrow();
335 if let ConnectionStatus::Connected = *borrow {
336 return Ok(());
337 }
338 if let Some(r) = borrow.session_end_result() {
339 return r;
340 }
341 }
342 if status_receiver.changed().await.is_err() {
343 return Err(IpcRpcError::ConnectionDropped);
344 }
345 }
346 }
347 }
348
349 pub fn wait_for_client_to_disconnect(
350 &mut self,
351 ) -> impl Future<Output = Result<(), IpcRpcError>> + Send + 'static {
352 let mut status_receiver = self.status_receiver.clone();
353 async move {
354 if let Some(r) = status_receiver.borrow().session_end_result() {
356 return r;
357 }
358 loop {
360 if status_receiver.changed().await.is_err() {
361 return Err(IpcRpcError::ConnectionDropped);
362 }
363 if let Some(r) = status_receiver.borrow().session_end_result() {
364 return r;
365 }
366 }
367 }
368 }
369
370 pub async fn schema_validated(&mut self) -> Result<SchemaValidationStatus, IpcRpcError> {
373 #[cfg(not(feature = "message-schema-validation"))]
374 {
375 Ok(SchemaValidationStatus::ValidationDisabledAtCompileTime)
376 }
377 #[cfg(feature = "message-schema-validation")]
378 {
379 if self.validation_receiver.borrow_and_update().is_none() {
380 self.validation_receiver
381 .changed()
382 .await
383 .map_err(|_| IpcRpcError::ConnectionDropped)?;
384 }
385 Ok(self
386 .validation_receiver
387 .borrow()
388 .as_ref()
389 .expect("the prior guaranteed this isn't empty")
390 .clone())
391 }
392 }
393}
394
395impl<U: UserMessage> IpcRpc<U> {
396 pub fn build() -> IpcRpcBuilder<U> {
397 IpcRpcBuilder::new()
398 }
399
400 async fn initialize_server_with_client<SE, F, Fut, A, ENVS, SK, SV>(
415 path_to_exe: SE,
416 message_handler: F,
417 arguments_fn: A,
418 env_vars: ENVS,
419 current_dir: Option<&Path>,
420 ) -> Result<IpcRpc<U>, IpcRpcError>
421 where
422 SE: AsRef<OsStr>,
423 F: Fn(U) -> Fut + Send + Sync + 'static,
424 Fut: Future<Output = Option<U>> + Send,
425 A: FnOnce(ConnectionKey, &mut Command),
426 ENVS: IntoIterator<Item = (SK, SV)>,
427 SK: AsRef<OsStr>,
428 SV: AsRef<OsStr>,
429 {
430 let (server_connect_key, mut server) =
431 IpcRpcServer::initialize_server(message_handler).await?;
432 log::info!(
433 "Starting {} in dir {:?}",
434 path_to_exe.as_ref().to_string_lossy(),
435 current_dir.as_ref().map(|p| p.display()),
436 );
437 let mut command = Command::new(path_to_exe);
438 command
439 .stdin(Stdio::piped())
440 .envs(env_vars)
441 .kill_on_drop(true);
442 if let Some(current_dir) = current_dir {
443 command.current_dir(current_dir);
444 }
445 arguments_fn(server_connect_key, &mut command);
446 let client_process = command.spawn()?;
447 match client_process.id() {
448 Some(pid) => {
449 log::info!("Started client with PID {}", pid);
450 }
451 None => {
452 log::error!("Started a client but it exited immediately")
453 }
454 }
455 server.wait_for_client_to_connect().await.unwrap();
456 Ok(IpcRpc {
457 server,
458 client_process,
459 })
460 }
461
462 pub fn send(
464 &self,
465 user_message: U,
466 ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
467 self.server.send(user_message)
468 }
469
470 pub fn send_timeout(
472 &self,
473 user_message: U,
474 timeout: Duration,
475 ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
476 self.server.send_timeout(user_message, timeout)
477 }
478
479 pub async fn schema_validated(&mut self) -> Result<SchemaValidationStatus, IpcRpcError> {
482 self.server.schema_validated().await
483 }
484}
485
486impl<U: UserMessage> Drop for IpcRpc<U> {
487 fn drop(&mut self) {
488 if let Ok(Some(status)) = self.client_process.try_wait() {
489 if !status.success() {
490 log::error!(
491 "{}Child process exited unsuccessfully, code: {:?}",
492 self.server.log_prefix,
493 status.code()
494 )
495 }
496 } else {
497 log::info!("{}Child process still running", self.server.log_prefix);
498 }
499 }
500}
501
502impl<U: UserMessage> Drop for IpcRpcServer<U> {
503 fn drop(&mut self) {
504 if let Err(e) = self.sender.send(InternalMessage {
505 uuid: Uuid::new_v4(),
506 kind: InternalMessageKind::Hangup,
507 }) {
508 log::error!(
509 "{}Error sending hangup message to client: {:?}",
510 self.log_prefix,
511 e
512 );
513 }
514 }
515}
516
517#[derive(Debug, Clone)]
519pub struct IpcRpcBuilder<U: UserMessage> {
520 current_dir: Option<PathBuf>,
521 env_vars: HashMap<OsString, OsString>,
522 phantom: PhantomData<U>,
523}
524
525impl<U: UserMessage> IpcRpcBuilder<U> {
526 fn new() -> Self {
527 Self {
528 current_dir: None,
529 env_vars: HashMap::new(),
530 phantom: PhantomData,
531 }
532 }
533
534 pub fn env<K: Into<OsString>, V: Into<OsString>>(&mut self, key: K, value: V) -> &mut Self {
536 self.env_vars.insert(key.into(), value.into());
537 self
538 }
539
540 pub fn envs<K: Into<OsString>, V: Into<OsString>, I: Iterator<Item = (K, V)>>(
542 &mut self,
543 iter: I,
544 ) -> &mut Self {
545 self.env_vars
546 .extend(iter.map(|(k, v)| (k.into(), v.into())));
547 self
548 }
549
550 pub fn current_dir<P: Into<PathBuf>>(&mut self, path: P) -> &mut Self {
552 self.current_dir = Some(path.into());
553 self
554 }
555
556 pub async fn finish<SE, F, Fut, A>(
566 &mut self,
567 path_to_exe: SE,
568 message_handler: F,
569 arguments_fn: A,
570 ) -> Result<IpcRpc<U>, IpcRpcError>
571 where
572 SE: AsRef<OsStr>,
573 F: Fn(U) -> Fut + Send + Sync + 'static,
574 Fut: Future<Output = Option<U>> + Send,
575 A: FnOnce(ConnectionKey, &mut Command),
576 {
577 IpcRpc::initialize_server_with_client(
578 path_to_exe,
579 message_handler,
580 arguments_fn,
581 self.env_vars
582 .iter()
583 .map(|(k, v)| (k.as_os_str(), v.as_os_str())),
584 self.current_dir.as_deref(),
585 )
586 .await
587 }
588}
589
590impl<U: UserMessage> Default for IpcRpcBuilder<U> {
591 fn default() -> Self {
592 Self::new()
593 }
594}