ipc_rpc/
server.rs

1use std::{
2    collections::HashMap,
3    ffi::{OsStr, OsString},
4    future::Future,
5    marker::PhantomData,
6    path::{Path, PathBuf},
7    process::Stdio,
8    str::FromStr,
9    sync::Arc,
10    thread,
11};
12
13use ipc_channel::ipc::{IpcOneShotServer, IpcSender};
14use tokio::{
15    process::{Child, Command},
16    sync::{mpsc, watch},
17    time::{Duration, Instant},
18};
19use uuid::Uuid;
20
21use crate::{
22    get_log_prefix, ConnectionStatus, IpcReceiveStream, IpcReplyFuture, IpcRpcError,
23    PendingReplyEntry, SchemaValidationStatus, UserMessage,
24};
25
26#[cfg(feature = "message-schema-validation")]
27use schemars::{schema_for, Schema};
28
29use super::{ConnectionKey, InternalMessage, InternalMessageKind};
30
31/// Passes messages along from the internal mpsc channel to the actual IPC channel. This is necessary
32/// because the IPC channel is not available at the time the server is initialized. That channel
33/// only becomes available once a connection is established.
34async fn process_outgoing_server_mail<U: UserMessage>(
35    mut internal_receiver: mpsc::UnboundedReceiver<InternalMessage<U>>,
36    ipc_sender: IpcSender<InternalMessage<U>>,
37    log_prefix: Arc<str>,
38) {
39    log::info!("{}Processing outgoing server mail!", log_prefix);
40    while let Some(message) = internal_receiver.recv().await {
41        if let Err(e) = ipc_sender.send(message) {
42            log::error!("{}Failed to send message to client: {:?}", log_prefix, e);
43        }
44    }
45    log::info!("{}Exiting outgoing server mail loop", log_prefix)
46}
47
48/// Used to send messages to the connected client (assuming one does connect)
49#[derive(Debug, Clone)]
50pub struct IpcRpcServer<U: UserMessage> {
51    sender: mpsc::UnboundedSender<InternalMessage<U>>,
52    status_receiver: watch::Receiver<ConnectionStatus>,
53    #[cfg(feature = "message-schema-validation")]
54    validation_receiver: watch::Receiver<Option<SchemaValidationStatus>>,
55    pending_reply_sender: mpsc::UnboundedSender<PendingReplyEntry<U>>,
56    log_prefix: Arc<str>,
57}
58
59/// Initializes a server and owns the connected client. This structure is the
60/// preferred way for servers to manage a relationship with a client.
61#[derive(Debug)]
62pub struct IpcRpc<U: UserMessage> {
63    pub server: IpcRpcServer<U>,
64    client_process: Child,
65}
66
67impl<U: UserMessage> IpcRpcServer<U> {
68    /// Initializes a server and provides a [`ConnectionKey`] which clients can use to connect to it,
69    /// even out of process.
70    pub async fn initialize_server<F, Fut>(
71        message_handler: F,
72    ) -> Result<(ConnectionKey, IpcRpcServer<U>), IpcRpcError>
73    where
74        F: Fn(U) -> Fut + Send + Sync + 'static,
75        Fut: Future<Output = Option<U>> + Send,
76    {
77        let (server, server_name) = IpcOneShotServer::<InternalMessage<U>>::new()?;
78        let (internal_sender, internal_receiver) = mpsc::unbounded_channel::<InternalMessage<U>>();
79        let runtime = tokio::runtime::Handle::current();
80        let (status_sender, status_receiver) = watch::channel(ConnectionStatus::WaitingForClient);
81        #[cfg(feature = "message-schema-validation")]
82        let (validation_sender, validation_receiver) = watch::channel(None);
83        let log_prefix = Arc::from(get_log_prefix(true));
84        let (pending_reply_sender, pending_reply_reciever) = mpsc::unbounded_channel();
85        let pending_reply_sender_clone = pending_reply_sender.clone();
86        log::info!("{}Server initialized!", log_prefix);
87        thread::spawn({
88            let log_prefix = Arc::clone(&log_prefix);
89            move || {
90                Self::startup(
91                    server,
92                    internal_receiver,
93                    pending_reply_sender_clone,
94                    pending_reply_reciever,
95                    message_handler,
96                    status_sender,
97                    #[cfg(feature = "message-schema-validation")]
98                    validation_sender,
99                    log_prefix,
100                    runtime,
101                )
102            }
103        });
104
105        Ok((
106            ConnectionKey::from_str(&server_name).expect("server_name is always uuid"),
107            IpcRpcServer {
108                sender: internal_sender,
109                pending_reply_sender,
110                status_receiver,
111                #[cfg(feature = "message-schema-validation")]
112                validation_receiver,
113                log_prefix,
114            },
115        ))
116    }
117
118    /// Responsible for initializing the various futures and threads needed to run the server.
119    ///
120    /// This may block.
121    // Reducing argument count here doesn't help with code clarity much.
122    #[allow(clippy::too_many_arguments)]
123    fn startup<Fut: Future<Output = Option<U>> + Send, F: Fn(U) -> Fut + Send + Sync + 'static>(
124        server: IpcOneShotServer<InternalMessage<U>>,
125        internal_receiver: mpsc::UnboundedReceiver<InternalMessage<U>>,
126        #[allow(unused)] pending_reply_sender: mpsc::UnboundedSender<PendingReplyEntry<U>>,
127        pending_reply_receiver: mpsc::UnboundedReceiver<PendingReplyEntry<U>>,
128        message_handler: F,
129        status_sender: watch::Sender<ConnectionStatus>,
130        #[cfg(feature = "message-schema-validation")] validation_sender: watch::Sender<
131            Option<SchemaValidationStatus>,
132        >,
133        log_prefix: Arc<str>,
134        runtime: tokio::runtime::Handle,
135    ) {
136        // This will block. Luckily it's okay to do that here.
137        let new_client = server.accept();
138        // Now that we're no longer blocking, enter the tokio runtime.
139        let _handle = runtime.enter();
140        match new_client {
141            Err(e) => {
142                log::error!("{}Error opening connection to client {:?}", log_prefix, e);
143                let e = IpcRpcError::from(e);
144                let _ = status_sender.send(ConnectionStatus::DisconnectError(e));
145            }
146            Ok((receive_from_client, first_message)) => {
147                if let InternalMessageKind::InitConnection(ipc_sender) = first_message.kind {
148                    let _ = status_sender.send(ConnectionStatus::Connected);
149                    log::info!("{}Connection established!", log_prefix);
150                    #[cfg(feature = "message-schema-validation")]
151                    {
152                        let (sender, receiver) = mpsc::unbounded_channel();
153                        let message = InternalMessage {
154                            uuid: Uuid::new_v4(),
155                            kind: InternalMessageKind::UserMessageSchema(
156                                serde_json::to_string(&schema_for!(U))
157                                    .expect("upstream guarantees this won't fail"),
158                            ),
159                        };
160                        if let Err(e) = pending_reply_sender.send((
161                            message.uuid,
162                            (sender, Instant::now() + crate::DEFAULT_REPLY_TIMEOUT),
163                        )) {
164                            log::error!("Failed to send entry for reply drop box {:?}", e);
165                        }
166                        match ipc_sender.send(message) {
167                            Ok(()) => {
168                                let reply_future = IpcReplyFuture { receiver };
169                                tokio::spawn(async move {
170                                    match reply_future.await {
171                                        Ok(InternalMessageKind::UserMessageSchemaOk) => {
172                                            log::info!(
173                                                "Remote client validated user message schema"
174                                            );
175                                            if let Err(e) = validation_sender
176                                                .send(Some(SchemaValidationStatus::SchemasMatched))
177                                            {
178                                                log::error!(
179                                                    "Failed to set validation_status {e:#?}"
180                                                );
181                                            }
182                                        }
183                                        Ok(InternalMessageKind::UserMessageSchemaError {
184                                            other_schema,
185                                        }) => {
186                                            let my_schema = schema_for!(U);
187                                            let res = validation_sender.send(Some(
188                                                SchemaValidationStatus::SchemaMismatch {
189                                                    our_schema: serde_json::to_string(&my_schema)
190                                                        .expect(
191                                                            "upstream guarantees this won't fail",
192                                                        ),
193                                                    their_schema: other_schema.clone(),
194                                                },
195                                            ));
196                                            if let Err(e) = res {
197                                                log::error!(
198                                                    "Failed to set validation_status {e:#?}"
199                                                );
200                                            }
201                                            match serde_json::from_str::<Schema>(&other_schema) {
202                                                Ok(other_schema) => {
203                                                    if other_schema == my_schema {
204                                                        log::error!("Client failed validation on user message schema, but the schemas match. This is probably a bug in ipc-rpc.");
205                                                    } else {
206                                                        log::error!("Failed to validate that user messages have the same schema. Messages may fail to serialize and deserialize correctly. This is a serious problem.\nServer Schema {my_schema:#?}\nClient Schema {other_schema:#?}")
207                                                    }
208                                                }
209                                                Err(_) => {
210                                                    log::error!("Server failed validation on user schema, and we failed to deserialize incoming schema properly, got {other_schema:?}");
211                                                }
212                                            }
213                                        }
214                                        Ok(m) => {
215                                            log::error!("Unexpected reply for user message schema validation {m:#?}");
216                                            if let Err(e) = validation_sender.send(Some(SchemaValidationStatus::ValidationNotPerformedProperly)) {
217                                                log::error!("Failed to set validation_status {e:#?}");
218                                            }
219                                        }
220                                        Err(IpcRpcError::ConnectionDropped) => {
221                                            // Do nothing, connection was dropped before validation completed.
222                                        }
223                                        Err(e) => {
224                                            log::error!("Failed to validate user message schema, messages may fail to serialize and deserialize correctly. Was the client compiled without the message-schema-validation feature? {e:#?}");
225                                            if let Err(e) = validation_sender.send(Some(SchemaValidationStatus::ValidationCommunicationFailed(e))) {
226                                                log::error!("Failed to set validation_status {e:#?}");
227                                            }
228                                        }
229                                    }
230                                });
231                            }
232                            Err(e) => {
233                                log::error!("Failed to send validation request to client {e:#?}");
234                            }
235                        }
236                    }
237                    // We need to run this outside of the tokio runtime worker pool, but still using
238                    // the tokio runtime so that it continues to run correctly while tokio is shutting
239                    // down.
240                    thread::Builder::new()
241                        .name("outgoing_mail".to_string())
242                        .spawn({
243                            let ipc_sender = ipc_sender.clone();
244                            let runtime = runtime.clone();
245                            move || {
246                                runtime.block_on(process_outgoing_server_mail(
247                                    internal_receiver,
248                                    ipc_sender,
249                                    log_prefix,
250                                ));
251                            }
252                        })
253                        .unwrap();
254                    runtime.spawn(async move {
255                        // This will only return when the client disconnects or some other kind of error happens.
256                        crate::process_incoming_mail(
257                            true,
258                            pending_reply_receiver,
259                            IpcReceiveStream::new(receive_from_client),
260                            message_handler,
261                            ipc_sender,
262                            status_sender,
263                        )
264                        .await;
265                    });
266                } else {
267                    log::error!("{}First message received was not an InitConnection message. Dropping connection.", log_prefix);
268                    let _ = status_sender.send(ConnectionStatus::DisconnectError(
269                        IpcRpcError::HandshakeFailure,
270                    ));
271                }
272            }
273        }
274    }
275
276    fn internal_send(
277        &self,
278        message_kind: InternalMessageKind<U>,
279        timeout: Duration,
280    ) -> impl Future<Output = Result<InternalMessageKind<U>, IpcRpcError>> + Send + 'static {
281        let message = InternalMessage {
282            uuid: Uuid::new_v4(),
283            kind: message_kind,
284        };
285        let (sender, receiver) = mpsc::unbounded_channel();
286        if let Err(e) = self
287            .pending_reply_sender
288            .send((message.uuid, (sender, Instant::now() + timeout)))
289        {
290            log::error!("Failed to send entry for reply drop box {:?}", e);
291        }
292        self.sender.send(message).unwrap();
293        IpcReplyFuture { receiver }
294    }
295
296    /// Sends a message, will give up on receiving a reply after the [`DEFAULT_REPLY_TIMEOUT`](./constant.DEFAULT_REPLY_TIMEOUT.html) has passed.
297    pub fn send(
298        &self,
299        user_message: U,
300    ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
301        self.send_timeout(user_message, crate::DEFAULT_REPLY_TIMEOUT)
302    }
303
304    /// Sends a message, waiting the given `timeout` for a reply.
305    pub fn send_timeout(
306        &self,
307        user_message: U,
308        timeout: Duration,
309    ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
310        let send_fut = self.internal_send(InternalMessageKind::UserMessage(user_message), timeout);
311        async move {
312            send_fut.await.map(|m| match m {
313                InternalMessageKind::UserMessage(m) => m,
314                _ => panic!(
315                    "Got a non-user message reply to a user message. This is a bug in ipc-rpc."
316                ),
317            })
318        }
319    }
320
321    pub fn client_connected(&self) -> bool {
322        matches!(*self.status_receiver.borrow(), ConnectionStatus::Connected)
323    }
324
325    pub fn wait_for_client_to_connect(
326        &mut self,
327    ) -> impl Future<Output = Result<(), IpcRpcError>> + Send + 'static {
328        let mut status_receiver = self.status_receiver.clone();
329        async move {
330            loop {
331                // Put the borrow in an inner scope to make sure we only hold it for short
332                // durations.
333                {
334                    let borrow = status_receiver.borrow();
335                    if let ConnectionStatus::Connected = *borrow {
336                        return Ok(());
337                    }
338                    if let Some(r) = borrow.session_end_result() {
339                        return r;
340                    }
341                }
342                if status_receiver.changed().await.is_err() {
343                    return Err(IpcRpcError::ConnectionDropped);
344                }
345            }
346        }
347    }
348
349    pub fn wait_for_client_to_disconnect(
350        &mut self,
351    ) -> impl Future<Output = Result<(), IpcRpcError>> + Send + 'static {
352        let mut status_receiver = self.status_receiver.clone();
353        async move {
354            // Has the session already ended?
355            if let Some(r) = status_receiver.borrow().session_end_result() {
356                return r;
357            }
358            // If not, wait for the session to end.
359            loop {
360                if status_receiver.changed().await.is_err() {
361                    return Err(IpcRpcError::ConnectionDropped);
362                }
363                if let Some(r) = status_receiver.borrow().session_end_result() {
364                    return r;
365                }
366            }
367        }
368    }
369
370    /// Returns the outcome of automatic schema validation testing. This testing is performed
371    /// on connection initiation.
372    pub async fn schema_validated(&mut self) -> Result<SchemaValidationStatus, IpcRpcError> {
373        #[cfg(not(feature = "message-schema-validation"))]
374        {
375            Ok(SchemaValidationStatus::ValidationDisabledAtCompileTime)
376        }
377        #[cfg(feature = "message-schema-validation")]
378        {
379            if self.validation_receiver.borrow_and_update().is_none() {
380                self.validation_receiver
381                    .changed()
382                    .await
383                    .map_err(|_| IpcRpcError::ConnectionDropped)?;
384            }
385            Ok(self
386                .validation_receiver
387                .borrow()
388                .as_ref()
389                .expect("the prior guaranteed this isn't empty")
390                .clone())
391        }
392    }
393}
394
395impl<U: UserMessage> IpcRpc<U> {
396    pub fn build() -> IpcRpcBuilder<U> {
397        IpcRpcBuilder::new()
398    }
399
400    /// Initializes a server and client, connects the two, then returns a combination structure
401    /// which can be used for the server side of the relationship.
402    ///
403    /// # Params
404    ///
405    /// - path_to_exe: The path to the exe which is expected to connect to the server on startup
406    /// - message_handler:  A function for handling spontaneous messages from the new client
407    /// - arguments_fn: This method **MUST** provide the server connect key to the client. The easiest way
408    ///   to do this is to pass in the key as a command line argument. The client must be
409    ///   prepared to read the key from wherever this function puts it.
410    /// - env_vars: Additional environment variables to pass in to the new client
411    /// - current_dir: The current directory of the client on startup. If not specified, this will default
412    ///   to the current directory of the server process.
413    ///   (The caller of `initialize_server_with_client` is the server process.)
414    async fn initialize_server_with_client<SE, F, Fut, A, ENVS, SK, SV>(
415        path_to_exe: SE,
416        message_handler: F,
417        arguments_fn: A,
418        env_vars: ENVS,
419        current_dir: Option<&Path>,
420    ) -> Result<IpcRpc<U>, IpcRpcError>
421    where
422        SE: AsRef<OsStr>,
423        F: Fn(U) -> Fut + Send + Sync + 'static,
424        Fut: Future<Output = Option<U>> + Send,
425        A: FnOnce(ConnectionKey, &mut Command),
426        ENVS: IntoIterator<Item = (SK, SV)>,
427        SK: AsRef<OsStr>,
428        SV: AsRef<OsStr>,
429    {
430        let (server_connect_key, mut server) =
431            IpcRpcServer::initialize_server(message_handler).await?;
432        log::info!(
433            "Starting {} in dir {:?}",
434            path_to_exe.as_ref().to_string_lossy(),
435            current_dir.as_ref().map(|p| p.display()),
436        );
437        let mut command = Command::new(path_to_exe);
438        command
439            .stdin(Stdio::piped())
440            .envs(env_vars)
441            .kill_on_drop(true);
442        if let Some(current_dir) = current_dir {
443            command.current_dir(current_dir);
444        }
445        arguments_fn(server_connect_key, &mut command);
446        let client_process = command.spawn()?;
447        match client_process.id() {
448            Some(pid) => {
449                log::info!("Started client with PID {}", pid);
450            }
451            None => {
452                log::error!("Started a client but it exited immediately")
453            }
454        }
455        server.wait_for_client_to_connect().await.unwrap();
456        Ok(IpcRpc {
457            server,
458            client_process,
459        })
460    }
461
462    /// Sends a message, will give up on receiving a reply after the [`DEFAULT_REPLY_TIMEOUT`](./constant.DEFAULT_REPLY_TIMEOUT.html) has passed.
463    pub fn send(
464        &self,
465        user_message: U,
466    ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
467        self.server.send(user_message)
468    }
469
470    /// Sends a message, waiting the given `timeout` for a reply.
471    pub fn send_timeout(
472        &self,
473        user_message: U,
474        timeout: Duration,
475    ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
476        self.server.send_timeout(user_message, timeout)
477    }
478
479    /// Returns the outcome of automatic schema validation testing. This testing is performed
480    /// on connection initiation.
481    pub async fn schema_validated(&mut self) -> Result<SchemaValidationStatus, IpcRpcError> {
482        self.server.schema_validated().await
483    }
484}
485
486impl<U: UserMessage> Drop for IpcRpc<U> {
487    fn drop(&mut self) {
488        if let Ok(Some(status)) = self.client_process.try_wait() {
489            if !status.success() {
490                log::error!(
491                    "{}Child process exited unsuccessfully, code: {:?}",
492                    self.server.log_prefix,
493                    status.code()
494                )
495            }
496        } else {
497            log::info!("{}Child process still running", self.server.log_prefix);
498        }
499    }
500}
501
502impl<U: UserMessage> Drop for IpcRpcServer<U> {
503    fn drop(&mut self) {
504        if let Err(e) = self.sender.send(InternalMessage {
505            uuid: Uuid::new_v4(),
506            kind: InternalMessageKind::Hangup,
507        }) {
508            log::error!(
509                "{}Error sending hangup message to client: {:?}",
510                self.log_prefix,
511                e
512            );
513        }
514    }
515}
516
517/// Builds an [IpcRpc]. This is initialized via [IpcRpc::build()]
518#[derive(Debug, Clone)]
519pub struct IpcRpcBuilder<U: UserMessage> {
520    current_dir: Option<PathBuf>,
521    env_vars: HashMap<OsString, OsString>,
522    phantom: PhantomData<U>,
523}
524
525impl<U: UserMessage> IpcRpcBuilder<U> {
526    fn new() -> Self {
527        Self {
528            current_dir: None,
529            env_vars: HashMap::new(),
530            phantom: PhantomData,
531        }
532    }
533
534    /// Additional environment variable to pass in to the new client.
535    pub fn env<K: Into<OsString>, V: Into<OsString>>(&mut self, key: K, value: V) -> &mut Self {
536        self.env_vars.insert(key.into(), value.into());
537        self
538    }
539
540    /// Additional environment variables to pass in to the new client.
541    pub fn envs<K: Into<OsString>, V: Into<OsString>, I: Iterator<Item = (K, V)>>(
542        &mut self,
543        iter: I,
544    ) -> &mut Self {
545        self.env_vars
546            .extend(iter.map(|(k, v)| (k.into(), v.into())));
547        self
548    }
549
550    /// Sets the current directory for the new client process
551    pub fn current_dir<P: Into<PathBuf>>(&mut self, path: P) -> &mut Self {
552        self.current_dir = Some(path.into());
553        self
554    }
555
556    /// Initializes a server and client, connects the two, then returns a combination structure
557    /// which can be used for the server side of the relationship.
558    ///
559    ///
560    /// - path_to_exe: The path to the exe which is expected to connect to the server on startup
561    /// - message_handler:  A function for handling spontaneous messages from the new client
562    /// - arguments_fn: This method **MUST** provide the server connect key to the client. The easiest way
563    ///   to do this is to pass in the key as a command line argument. The client must be
564    ///   prepared to read the key from wherever this function puts it.
565    pub async fn finish<SE, F, Fut, A>(
566        &mut self,
567        path_to_exe: SE,
568        message_handler: F,
569        arguments_fn: A,
570    ) -> Result<IpcRpc<U>, IpcRpcError>
571    where
572        SE: AsRef<OsStr>,
573        F: Fn(U) -> Fut + Send + Sync + 'static,
574        Fut: Future<Output = Option<U>> + Send,
575        A: FnOnce(ConnectionKey, &mut Command),
576    {
577        IpcRpc::initialize_server_with_client(
578            path_to_exe,
579            message_handler,
580            arguments_fn,
581            self.env_vars
582                .iter()
583                .map(|(k, v)| (k.as_os_str(), v.as_os_str())),
584            self.current_dir.as_deref(),
585        )
586        .await
587    }
588}
589
590impl<U: UserMessage> Default for IpcRpcBuilder<U> {
591    fn default() -> Self {
592        Self::new()
593    }
594}