ipc_rpc/
lib.rs

1//! Inter-Process Communication Remote Procedure Calls
2//!
3//! [![Crates.io](https://img.shields.io/crates/v/ipc-rpc.svg)](https://crates.io/crates/ipc-rpc/)
4//!
5//! Minimum Supported Rust Version: 1.61
6//!
7//! This Rust library is a wrapper over [`servo/ipc-channel`](https://github.com/servo/ipc-channel) that adds many new features.
8//!
9//! - Bi-directional communication by default.
10//! - [Future](https://doc.rust-lang.org/stable/std/future/trait.Future.html) based reply mechanism allows easy and accurate pairing of requests and responses.
11//! - (Optional, enabled by default) Validation on startup that message schemas match between the server and client. No more debugging bad deserializations. Relies on [`schemars`](https://crates.io/crates/schemars).
12//! - Streamlined initialization of IPC channels for common use cases, while still allowing for more flexible and robust dynamic initialization in more unique scenarios.
13//!
14//! Compatible with anything that can run `servo/ipc-channel`, which at time of writing includes
15//!
16//! - Windows
17//! - MacOS
18//! - Linux
19//! - FreeBSD
20//! - OpenBSD
21//!
22//! Additionally, `servo/ipc-channel` supports the following platforms but only while in `inprocess` mode, which is not capable of communication between processes.
23//!
24//! - Android
25//! - iOS
26//! - WASI
27//!
28//! ## tokio
29//!
30//! This crate uses the [`tokio`](https://crates.io/crates/tokio) runtime for executing futures, and it is a hard requirement that users of `ipc-rpc` must use `tokio`. There are no plans to add support for other
31//! executors, but sufficient demand for other executors may change that.
32//!
33//! # Cargo features
34//!
35//! This crate exposes one feature, `message-schema-validation`, which is on by default. This enables functionality related to the [`schemars`](https://crates.io/crates/schemars) crate.
36//! When enabled, the software will attempt to validate the user message schema on initialization of the connection. Failure to validate is not a critical failure, and won't crash the program.
37//! An error will be emitted in the logs, and this status can be retrieved programmatically via many functions, all called `schema_validated()`.
38//!
39//! If you decide that a failure to validate the schema should be a critical failure you can add the following line of code to your program for execution after a connection is established.
40//!
41//! ### Server
42//! ```no_run
43//! # async {
44//! # let (_key, mut server) = ipc_rpc::IpcRpcServer::initialize_server(|_| async {Option::<()>::None}).await.unwrap();
45//! server.schema_validated().await.unwrap().assert_success();
46//! # };
47//! ```
48//!
49//! ### Client
50//! ```no_run
51//! # async {
52//! # use ipc_rpc::{IpcRpcClient, ConnectionKey};
53//! # let mut client = IpcRpcClient::initialize_client(ConnectionKey::from(String::new()), |_| async {Option::<()>::None}).await.unwrap();
54//! client.schema_validated().await.unwrap().assert_success();
55//! # };
56//! ```
57//!
58//! # Limitations
59//!
60//! Much like `servo/ipc-channel`, servers may only serve one client. Overcoming this limitation would require work within `servo/ipc-channel`.
61
62use std::{
63    collections::HashMap,
64    convert::Infallible,
65    env,
66    ffi::OsString,
67    fmt::{self, Debug, Display},
68    future::Future,
69    io,
70    path::Path,
71    pin::Pin,
72    str::FromStr,
73    sync::Arc,
74    task::{Context, Poll},
75};
76
77use futures::{Stream, StreamExt};
78use ipc_channel::ipc::{IpcError, IpcReceiver, IpcSender, TryRecvError};
79use serde::{de::DeserializeOwned, Deserialize, Serialize};
80use thiserror::Error;
81use tokio::{
82    sync::{mpsc, watch},
83    time::{Duration, Instant},
84};
85use uuid::Uuid;
86
87#[cfg(feature = "message-schema-validation")]
88use schemars::{schema_for, JsonSchema, Schema};
89
90mod client;
91pub use client::*;
92mod server;
93pub use server::*;
94
95/// This key can be used to connect to the IPC server it came with, even outside of this process.
96#[derive(Deserialize, Serialize, Debug, Clone)]
97pub struct ConnectionKey(String);
98
99impl From<String> for ConnectionKey {
100    fn from(s: String) -> Self {
101        Self(s)
102    }
103}
104
105impl FromStr for ConnectionKey {
106    type Err = Infallible;
107
108    fn from_str(s: &str) -> Result<Self, Self::Err> {
109        Ok(Self(s.to_string()))
110    }
111}
112
113impl Display for ConnectionKey {
114    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
115        write!(f, "{}", self.0)
116    }
117}
118
119impl From<ConnectionKey> for OsString {
120    fn from(s: ConnectionKey) -> Self {
121        OsString::from(s.0)
122    }
123}
124
125impl From<ConnectionKey> for String {
126    fn from(key: ConnectionKey) -> Self {
127        key.0
128    }
129}
130
131/// The pending reply box maintains a list of messages that are awaiting a reply. Messages are
132/// paired via their uuids, and delivered via the tokio mpsc framework to a future awaiting
133/// the reply. If a reply never arrives eventually a spawned Future responsible for maintenance
134/// of this mail box will clean them out and resolve the futures with a time out.
135type PendingReplyEntry<U> = (
136    Uuid,
137    (
138        mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
139        Instant,
140    ),
141);
142/// Internal protocol message structure. Wraps the actual user message with some structure
143/// helpful to the mail delivery system. You could liken this to a letter's envelope.
144#[derive(Deserialize, Serialize, Debug, Clone)]
145#[serde(bound(deserialize = ""))]
146struct InternalMessage<U: UserMessage> {
147    /// An identifier used for pairing messages with responses. A response to a message should
148    /// carry the same UUID the message itself had. Otherwise, the UUID should be unique.
149    uuid: uuid::Uuid,
150    /// The actual contents of the message.
151    kind: InternalMessageKind<U>,
152}
153
154/// There are many reasons we might need to send a message. This enumerates them. The enum also
155/// contains some messages intended for internal use that downstream users shouldn't concern
156/// themselves with.
157#[derive(Deserialize, Serialize, Debug, Clone)]
158#[serde(bound(deserialize = ""))]
159enum InternalMessageKind<U: UserMessage> {
160    /// Initialize connection. Includes an IpcSender which can be used to communicate with the client.
161    InitConnection(IpcSender<InternalMessage<U>>),
162    /// The conversation has come to a close, it's time to shut down.
163    Hangup,
164    /// Downstream has a message to exchange.
165    UserMessage(U),
166    /// Describes the user message schema for validation purposes
167    UserMessageSchema(String),
168    /// Returned if the user message schema matched
169    UserMessageSchemaOk,
170    /// Returned if the user message schema did not match
171    UserMessageSchemaError { other_schema: String },
172}
173
174/// Errors which can occur with ipc-rpc during operation.
175#[derive(Clone, Debug, Error)]
176pub enum IpcRpcError {
177    #[error("io error")]
178    IoError(#[from] Arc<io::Error>),
179    #[error("internal ipc channel error")]
180    IpcChannelError(#[from] Arc<ipc_channel::Error>),
181    #[error("connection initialization timed out")]
182    ConnectTimeout,
183    #[error("connection established, but initial handshake was not performed properly")]
184    HandshakeFailure,
185    #[error("client already connected")]
186    ClientAlreadyConnected,
187    #[error("peer disconnected")]
188    Disconnected,
189    #[error("time out while waiting for a reply")]
190    ReplyTimeout,
191    #[error("connection dropped pre-emptively")]
192    ConnectionDropped,
193}
194
195impl From<io::Error> for IpcRpcError {
196    fn from(e: io::Error) -> Self {
197        Self::IoError(Arc::new(e))
198    }
199}
200
201impl From<ipc_channel::Error> for IpcRpcError {
202    fn from(e: ipc_channel::Error) -> Self {
203        Self::IpcChannelError(Arc::new(e))
204    }
205}
206
207/// The default timeout used for `send()` style methods. Use `send_timeout()` to use something else.
208///
209/// This may change in the future. The current value is
210/// ```
211/// # use tokio::time::Duration;
212/// # assert_eq!(
213/// # ipc_rpc::DEFAULT_REPLY_TIMEOUT,
214/// Duration::from_secs(5)
215/// # );
216/// ```
217pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
218
219/// Processes incoming messages for both the client and the server. Responsible for distributing
220/// and generating replies.
221async fn process_incoming_mail<
222    Fut: Future<Output = Option<U>> + Send,
223    F: Fn(U) -> Fut + Send + Sync + 'static,
224    U: UserMessage,
225>(
226    is_server: bool,
227    mut pending_reply_receiver: mpsc::UnboundedReceiver<PendingReplyEntry<U>>,
228    mut receiver: IpcReceiveStream<InternalMessage<U>>,
229    message_handler: F,
230    response_sender: IpcSender<InternalMessage<U>>,
231    status_sender: watch::Sender<ConnectionStatus>,
232) {
233    let mut pending_replies = HashMap::<
234        Uuid,
235        (
236            mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
237            Instant,
238        ),
239    >::new();
240    let message_handler = Arc::new(message_handler);
241    let log_prefix = get_log_prefix(is_server);
242    log::info!("{}Processing incoming mail!", log_prefix);
243    let mut consecutive_error_count = 0;
244    let mut pending_reply_scheduled_time = Option::<Instant>::None;
245
246    let add_pending_reply = |pending_reply_scheduled_time: &mut Option<Instant>,
247                             pending_replies: &mut HashMap<_, _>,
248                             (key, (sender, timeout))| {
249        *pending_reply_scheduled_time = Some(
250            pending_reply_scheduled_time
251                .map(|t| t.min(timeout))
252                .unwrap_or(timeout),
253        );
254
255        pending_replies.insert(key, (sender, timeout));
256    };
257    loop {
258        // Empty out the pending reply receiver before entering the select below. This guarantees
259        // that all queued reply drop boxes are processed before we start receiving incoming mail.
260        while let Ok(pending_reply) = pending_reply_receiver.try_recv() {
261            add_pending_reply(
262                &mut pending_reply_scheduled_time,
263                &mut pending_replies,
264                pending_reply,
265            );
266        }
267        tokio::select! {
268            true = async { if let Some(t) = pending_reply_scheduled_time { tokio::time::sleep_until(t).await; true } else { false } } => {
269                pending_replies.retain(|_k, v| {
270                    let keep = v.1 > Instant::now();
271                    if !keep {
272                        let _ = v.0.send(Err(IpcRpcError::ReplyTimeout));
273                    }
274                    keep
275                });
276                pending_reply_scheduled_time = pending_replies.values().map(|i| i.1).min();
277            }
278            pending_reply = pending_reply_receiver.recv() => {
279                match pending_reply {
280                    None => {
281                        // Sender got dropped, time to close up shop.
282                        break;
283                    }
284                    Some(pending_reply) => {
285                        add_pending_reply(&mut pending_reply_scheduled_time, &mut pending_replies, pending_reply);
286                    }
287                }
288            },
289            r = receiver.next() => {
290                match r {
291                    None => {
292                        // incoming mail stream ended, time to shut down.
293                        break;
294                    }
295                    Some(Err(e)) => {
296                        if let IpcError::Disconnected = e {
297                            log::info!("{}Peer disconnected.", log_prefix);
298                            break;
299                        } else {
300                            log::error!("{}Error receiving message from peer {:?}", log_prefix, e);
301                            consecutive_error_count += 1;
302                            if consecutive_error_count > 20 {
303                                log::error!("{}Too many consecutive errors, shutting down.", log_prefix);
304                                break;
305                            }
306                        }
307                    }
308                    Some(Ok(message)) => {
309                        consecutive_error_count = 0;
310                        log::debug!("{}Got message! {:?}", log_prefix, message);
311                        let reply = pending_replies.remove(&message.uuid);
312                        if let Some((reply_drop_box, _)) = reply {
313                            log::debug!("{}It's a reply, forwarding!", log_prefix);
314                            // If the end user doesn't want this message that's fine.
315                            let _ = reply_drop_box.send(Ok(message.kind));
316                        } else {
317                            log::debug!("{}It's not a reply, handling!", log_prefix);
318                            let message_uuid = message.uuid;
319                            match message.kind {
320                                InternalMessageKind::UserMessage(user_message) => {
321                                    let message_handler = Arc::clone(&message_handler);
322                                    let response_sender = response_sender.clone();
323                                    tokio::spawn(async move {
324                                        if let Some(m) = message_handler(user_message).await {
325                                            let r = response_sender.send(InternalMessage {
326                                                uuid: message_uuid,
327                                                kind: InternalMessageKind::UserMessage(m),
328                                            });
329                                            if let Err(e) = r {
330                                                log::error!("Failed to send reply {e:?}");
331                                            }
332                                        }
333                                    });
334                                }
335                                #[cfg(feature = "message-schema-validation")]
336                                InternalMessageKind::UserMessageSchema(other_schema) => {
337                                    let my_schema = schema_for!(U);
338                                    let kind = match serde_json::from_str::<Schema>(&other_schema) {
339                                        Ok(other_schema) => {
340                                            if other_schema == my_schema {
341                                                InternalMessageKind::UserMessageSchemaOk
342                                            } else {
343                                                InternalMessageKind::UserMessageSchemaError {
344                                                    other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
345                                                }
346                                            }
347                                        },
348                                        Err(_) => {
349                                            log::error!("Failed to deserialize incoming schema properly, got {other_schema:?}");
350                                            InternalMessageKind::UserMessageSchemaError {
351                                                other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
352                                            }
353                                        }
354                                    };
355                                    let r = response_sender.send(InternalMessage {
356                                        uuid: message_uuid,
357                                        kind,
358                                    });
359                                    if let Err(e) = r {
360                                        log::error!("Failed to send validation response {e:#?}");
361                                    }
362                                }
363                                InternalMessageKind::Hangup => {
364                                    break;
365                                }
366                                _ => {}
367                            }
368                        }
369                    }
370                }
371            }
372        }
373    }
374    let _ = status_sender.send(ConnectionStatus::DisconnectedCleanly);
375}
376
377fn get_log_prefix(is_server: bool) -> String {
378    let first_arg = env::args()
379        .next()
380        .unwrap_or_else(|| String::from("Unknown"));
381    let process = Path::new(&first_arg)
382        .file_name()
383        .unwrap_or_else(|| "Unknown".as_ref())
384        .to_string_lossy();
385    if is_server {
386        format!("{} as Server: ", process)
387    } else {
388        format!("{} as Client: ", process)
389    }
390}
391
392/// Reports the status of the connection between the server and the client
393#[derive(Clone, Debug)]
394pub enum ConnectionStatus {
395    /// The server is waiting for a client to connect. A client never waits for a server to connect.
396    WaitingForClient,
397    /// The connection is active.
398    Connected,
399    /// A shutdown happened, and this was normal. Nothing unexpected happened.
400    DisconnectedCleanly,
401    /// An unexpected shutdown occurred, error contained within.
402    DisconnectError(IpcRpcError),
403}
404
405impl ConnectionStatus {
406    pub fn session_end_result(&self) -> Option<Result<(), IpcRpcError>> {
407        match self {
408            ConnectionStatus::WaitingForClient | ConnectionStatus::Connected => None,
409            ConnectionStatus::DisconnectedCleanly => Some(Ok(())),
410            ConnectionStatus::DisconnectError(e) => Some(Err(e.clone())),
411        }
412    }
413}
414
415/// This future represents a reply to a previous message. It is not a public type. This is to permit
416/// future flexibility in how this interface is implemented.
417struct IpcReplyFuture<U: UserMessage> {
418    receiver: mpsc::UnboundedReceiver<Result<InternalMessageKind<U>, IpcRpcError>>,
419}
420
421impl<U: UserMessage> Future for IpcReplyFuture<U> {
422    type Output = Result<InternalMessageKind<U>, IpcRpcError>;
423
424    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
425        self.receiver.poll_recv(cx).map(|o| match o {
426            Some(m) => m,
427            None => Err(IpcRpcError::ConnectionDropped),
428        })
429    }
430}
431
432struct IpcReceiveStream<T> {
433    receiver: mpsc::UnboundedReceiver<Result<T, IpcError>>,
434}
435
436impl<T> IpcReceiveStream<T>
437where
438    T: Send + for<'de> Deserialize<'de> + Serialize + 'static,
439{
440    pub fn new(ipc_receiver: IpcReceiver<T>) -> Self {
441        let (sender, receiver) = mpsc::unbounded_channel();
442        tokio::task::spawn_blocking(move || loop {
443            match ipc_receiver.try_recv_timeout(Duration::from_millis(250)) {
444                Ok(msg) => {
445                    if sender.send(Ok(msg)).is_err() {
446                        break;
447                    }
448                }
449                Err(TryRecvError::IpcError(e)) => {
450                    if sender.send(Err(e)).is_err() {
451                        break;
452                    }
453                }
454                Err(TryRecvError::Empty) => {
455                    if sender.is_closed() {
456                        break;
457                    }
458                }
459            }
460        });
461        Self { receiver }
462    }
463}
464
465impl<T> Stream for IpcReceiveStream<T> {
466    type Item = Result<T, IpcError>;
467
468    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
469        self.receiver.poll_recv(cx)
470    }
471}
472
473/// Reports the outcome of automatic schema validation testing. This testing is performed
474/// on connection initiation.
475#[derive(Debug, Clone)]
476pub enum SchemaValidationStatus {
477    /// Crate was compiled without default features, so validation does not function.
478    ValidationDisabledAtCompileTime,
479    /// Internal error
480    ValidationNotPerformedProperly,
481    /// Internal error
482    ValidationCommunicationFailed(IpcRpcError),
483    /// Schemas matched, all clear
484    SchemasMatched,
485    /// Schemas didn't match, schemas provided for comparison
486    SchemaMismatch {
487        our_schema: String,
488        their_schema: String,
489    },
490}
491
492impl SchemaValidationStatus {
493    /// Returns true if this is an instance of [`Self::SchemasMatched`], and false otherwise.
494    pub fn is_success(&self) -> bool {
495        matches!(self, SchemaValidationStatus::SchemasMatched)
496    }
497
498    /// Panics if this status is not [`Self::SchemasMatched`], and prints the debug information into the panic.
499    pub fn assert_success(&self) {
500        if !self.is_success() {
501            panic!("ipc-rpc user message schema failed to validate, error {self:#?}");
502        }
503    }
504}
505
506/// A combination trait which represents all of the traits needed for a type to be
507/// used as a message with this library.
508///
509/// # Note on `JsonSchema`
510/// `JsonSchema` is a trait from the [`schemars`](https://crates.io/crates/schemars) crate.
511/// Deriving it for your message allows `ipc-rpc` to perform validation on message schemas
512/// after communication has started. The results of this validation are logged, and stored in
513/// [IpcRpcClient::schema_validated], [IpcRpcServer::schema_validated], and [IpcRpc::schema_validated].
514/// A failed validation will not crash the program.
515///
516/// Despite what this may imply, `ipc-rpc` does not use JSON for messages internally.
517///
518/// If you find yourself unable to derive `JsonSchema` then consider turning off
519/// default features for the `ipc-rpc` crate. Once you do, the `JsonSchema` requirement
520/// will go away.
521#[cfg(feature = "message-schema-validation")]
522pub trait UserMessage:
523    'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
524{
525}
526
527#[cfg(feature = "message-schema-validation")]
528impl<T> UserMessage for T where
529    T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
530{
531}
532
533/// A combination trait which represents all of the traits needed for a type to be
534/// used as a message with this library.
535#[cfg(not(feature = "message-schema-validation"))]
536pub trait UserMessage: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
537
538#[cfg(not(feature = "message-schema-validation"))]
539impl<T> UserMessage for T where T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
540
541/// Invokes an RPC call within the current async runtime.
542///
543/// # Params
544/// - `sender`: A reference to an [IpcRpcServer], [IpcRpcClient], or [IpcRpc]
545/// - `to_send`: The message sent to the remote
546/// - `receiver`: The expected response pattern, followed by a handler for it.
547///
548/// # Returns
549/// The value returned by `receiver`.
550///
551/// # Panics
552/// Panics if the message received from the remote doesn't match the pattern specified
553/// in receiver.
554///
555/// # Example
556///
557/// ```no_run
558/// use serde::{Deserialize, Serialize};
559/// use schemars::JsonSchema;
560/// use std::fmt::Debug;
561///
562/// #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
563/// enum Message {
564///     MakeMeASandwich,
565///     /// The sandwiches are made of i32 around here, don't judge.
566///     ASandwich(Vec<i32>),
567/// }
568///
569/// // Initialize a client
570///
571/// # async {
572/// # use ipc_rpc::{IpcRpcClient, ConnectionKey, rpc_call};
573/// # let mut client = IpcRpcClient::initialize_client(ConnectionKey::from(String::new()), |_| async {Option::<Message>::None}).await.unwrap();
574/// rpc_call!(
575///     sender: client,
576///     to_send: Message::MakeMeASandwich,
577///     receiver: Message::ASandwich(parts) => {
578///         log::info!("I got a sandwich! It contains\n{parts:#?}");
579///     },
580/// )
581/// .unwrap()
582/// # };
583/// ```
584#[macro_export]
585macro_rules! rpc_call {
586    (sender: $sender:expr, to_send: $to_send:expr, receiver: $received:pat_param => $to_do:block,) => {
587        $sender.send($to_send).await.map(|m| match m {
588            $received => $to_do,
589            _ => panic!("rpc_call response didn't match given pattern"),
590        })
591    };
592}
593
594#[cfg(test)]
595mod tests {
596    use tokio::time::timeout;
597
598    use super::*;
599
600    #[cfg(not(feature = "message-schema-validation"))]
601    compile_error!("Tests must be executed with all features on");
602
603    #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
604    pub struct IpcProtocolMessage {
605        pub kind: IpcProtocolMessageKind,
606    }
607
608    #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
609    pub enum IpcProtocolMessageKind {
610        TestMessage,
611        ClientTestReply,
612        ServerTestReply,
613    }
614
615    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
616    async fn basic_dialogue() {
617        let (server_key, mut server) =
618            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
619                match message.kind {
620                    IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
621                        kind: IpcProtocolMessageKind::ServerTestReply,
622                    }),
623                    _ => None,
624                }
625            })
626            .await
627            .unwrap();
628        let mut client = client::IpcRpcClient::initialize_client(
629            server_key,
630            |message: IpcProtocolMessage| async move {
631                match message.kind {
632                    IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
633                        kind: IpcProtocolMessageKind::ClientTestReply,
634                    }),
635                    _ => None,
636                }
637            },
638        )
639        .await
640        .unwrap();
641        server.schema_validated().await.unwrap().assert_success();
642        client.schema_validated().await.unwrap().assert_success();
643        let client_reply = server
644            .send(IpcProtocolMessage {
645                kind: IpcProtocolMessageKind::TestMessage,
646            })
647            .await;
648        if !matches!(
649            client_reply.as_ref().map(|r| &r.kind),
650            Ok(IpcProtocolMessageKind::ClientTestReply)
651        ) {
652            panic!("client reply was of unexpected type: {:?}", client_reply);
653        }
654        let server_reply = client
655            .send(IpcProtocolMessage {
656                kind: IpcProtocolMessageKind::TestMessage,
657            })
658            .await;
659        if !matches!(
660            server_reply.as_ref().map(|r| &r.kind),
661            Ok(IpcProtocolMessageKind::ServerTestReply)
662        ) {
663            panic!("server reply was of unexpected type: {:?}", server_reply);
664        }
665    }
666
667    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
668    async fn send_without_await() {
669        let (server_success_sender, mut server_success_receiver) = mpsc::unbounded_channel();
670        let (client_success_sender, mut client_success_receiver) = mpsc::unbounded_channel();
671        let (server_key, mut server) =
672            server::IpcRpcServer::initialize_server(move |message: IpcProtocolMessage| {
673                let server_success_sender = server_success_sender.clone();
674                async move {
675                    match message.kind {
676                        IpcProtocolMessageKind::TestMessage => {
677                            server_success_sender.send(()).unwrap()
678                        }
679                        _ => {}
680                    }
681                    None
682                }
683            })
684            .await
685            .unwrap();
686        let mut client = client::IpcRpcClient::initialize_client(
687            server_key,
688            move |message: IpcProtocolMessage| {
689                let client_success_sender = client_success_sender.clone();
690                async move {
691                    match message.kind {
692                        IpcProtocolMessageKind::TestMessage => {
693                            client_success_sender.send(()).unwrap()
694                        }
695                        _ => {}
696                    }
697                    None
698                }
699            },
700        )
701        .await
702        .unwrap();
703        server.schema_validated().await.unwrap().assert_success();
704        client.schema_validated().await.unwrap().assert_success();
705        let _ = server.send(IpcProtocolMessage {
706            kind: IpcProtocolMessageKind::TestMessage,
707        });
708        let _ = client.send(IpcProtocolMessage {
709            kind: IpcProtocolMessageKind::TestMessage,
710        });
711        assert_eq!(
712            timeout(Duration::from_secs(3), server_success_receiver.recv()).await,
713            Ok(Some(()))
714        );
715        assert_eq!(
716            timeout(Duration::from_secs(3), client_success_receiver.recv()).await,
717            Ok(Some(()))
718        );
719    }
720
721    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
722    async fn timeout_test() {
723        let (server_key, mut server) =
724            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
725                match message.kind {
726                    _ => None,
727                }
728            })
729            .await
730            .unwrap();
731        let mut client = client::IpcRpcClient::initialize_client(
732            server_key,
733            |message: IpcProtocolMessage| async move {
734                match message.kind {
735                    _ => None,
736                }
737            },
738        )
739        .await
740        .unwrap();
741        server.schema_validated().await.unwrap().assert_success();
742        client.schema_validated().await.unwrap().assert_success();
743        let wait_start = Instant::now();
744        let client_reply = server
745            .send(IpcProtocolMessage {
746                kind: IpcProtocolMessageKind::TestMessage,
747            })
748            .await;
749        assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
750        if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
751            panic!("client reply was of unexpected type: {:?}", client_reply);
752        }
753        let wait_start = Instant::now();
754        let server_reply = client
755            .send(IpcProtocolMessage {
756                kind: IpcProtocolMessageKind::TestMessage,
757            })
758            .await;
759        assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
760        if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
761            panic!("server reply was of unexpected type: {:?}", server_reply);
762        }
763    }
764
765    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
766    async fn custom_timeout_test() {
767        let (server_key, mut server) =
768            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
769                match message.kind {
770                    _ => None,
771                }
772            })
773            .await
774            .unwrap();
775        let mut client = client::IpcRpcClient::initialize_client(
776            server_key,
777            |message: IpcProtocolMessage| async move {
778                match message.kind {
779                    _ => None,
780                }
781            },
782        )
783        .await
784        .unwrap();
785        server.schema_validated().await.unwrap().assert_success();
786        client.schema_validated().await.unwrap().assert_success();
787        let custom_timeout: Duration = DEFAULT_REPLY_TIMEOUT / 2;
788        let wait_start = Instant::now();
789        let client_reply = server
790            .send_timeout(
791                IpcProtocolMessage {
792                    kind: IpcProtocolMessageKind::TestMessage,
793                },
794                custom_timeout,
795            )
796            .await;
797        assert!(wait_start.elapsed() >= custom_timeout);
798        assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
799        if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
800            panic!("client reply was of unexpected type: {:?}", client_reply);
801        }
802        let wait_start = Instant::now();
803        let server_reply = client
804            .send_timeout(
805                IpcProtocolMessage {
806                    kind: IpcProtocolMessageKind::TestMessage,
807                },
808                custom_timeout,
809            )
810            .await;
811        assert!(wait_start.elapsed() >= custom_timeout);
812        assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
813        if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
814            panic!("server reply was of unexpected type: {:?}", server_reply);
815        }
816    }
817    #[test_log::test]
818    fn server_drop_does_not_hang() {
819        let thread = std::thread::spawn(|| {
820            let runtime = tokio::runtime::Runtime::new().unwrap();
821            runtime.block_on(async {
822                let (_server_key, _server) = server::IpcRpcServer::initialize_server(
823                    |message: IpcProtocolMessage| async move {
824                        match message.kind {
825                            _ => None,
826                        }
827                    },
828                )
829                .await
830                .unwrap();
831            })
832        });
833
834        let start = Instant::now();
835        let timeout = Duration::from_secs(5);
836        while !thread.is_finished() {
837            if start.elapsed() >= timeout {
838                // Server drop is hanging, force quit with non-zero
839                std::process::exit(1);
840            }
841        }
842    }
843
844    // This test checks to see if a task has come to an end, so it must wait for the runtime to drop.
845    // Therefore tokio::test is not an option.
846    #[test_log::test]
847    fn server_disconnect_test() {
848        // We use an empty Arc as a resource inside the message handler to prove the message handler
849        // was dropped. If the message handler was dropped then the cleanup routines were executed.
850        let drop_detector = Arc::new(());
851        let drop_detector_clone = drop_detector.clone();
852        let runtime = tokio::runtime::Runtime::new().unwrap();
853        runtime.block_on(async move {
854            let (server_key, server) = server::IpcRpcServer::initialize_server({
855                let drop_detector_clone = drop_detector_clone.clone();
856                move |message: IpcProtocolMessage| {
857                    let drop_detector_clone = drop_detector_clone.clone();
858                    async move {
859                        match message.kind {
860                            _ => {
861                                // I don't expect this to ever be called, I just need the closure to take
862                                // ownership of the Arc.
863                                let _ = drop_detector_clone.clone();
864                                None
865                            }
866                        }
867                    }
868                }
869            })
870            .await
871            .unwrap();
872            let client = client::IpcRpcClient::initialize_client(
873                server_key,
874                |message: IpcProtocolMessage| async move {
875                    match message.kind {
876                        _ => None,
877                    }
878                },
879            )
880            .await
881            .unwrap();
882            assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
883            drop(server);
884            client.wait_for_server_to_disconnect().await.unwrap();
885        });
886        // It's important that the runtime be able to shut down quickly, so we'll assert it
887        // shut down within 3 seconds.
888        let start_shutdown = Instant::now();
889        runtime.shutdown_timeout(Duration::from_secs(5));
890        assert!(start_shutdown.elapsed() < Duration::from_secs(3));
891        assert_eq!(Arc::strong_count(&drop_detector), 1);
892    }
893
894    // This test checks to see if a task has come to an end, so it must wait for the runtime to drop.
895    // Therefore tokio::test is not an option.
896    #[test_log::test]
897    fn client_disconnect_test() {
898        use std::time::Instant;
899
900        // We use an empty Arc as a resource inside the message handler to prove the message handler
901        // was dropped. If the message handler was dropped then the cleanup routines were executed.
902        let drop_detector = Arc::new(());
903        let drop_detector_clone = drop_detector.clone();
904        let runtime = tokio::runtime::Runtime::new().unwrap();
905        runtime.block_on(async move {
906            let (server_key, mut server) =
907                server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
908                    match message.kind {
909                        _ => None,
910                    }
911                })
912                .await
913                .unwrap();
914            let client = client::IpcRpcClient::initialize_client(server_key, {
915                let drop_detector_clone = drop_detector_clone.clone();
916                move |message: IpcProtocolMessage| {
917                    let _drop_detector_clone = drop_detector_clone.clone();
918                    async move {
919                        match message.kind {
920                            _ => None,
921                        }
922                    }
923                }
924            })
925            .await
926            .unwrap();
927            assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
928            drop(client);
929            server.wait_for_client_to_disconnect().await.unwrap();
930        });
931        // It's important that the runtime be able to shut down quickly, so we'll assert it
932        // shut down within 3 seconds.
933        let start_shutdown = Instant::now();
934        runtime.shutdown_timeout(Duration::from_secs(5));
935        assert!(start_shutdown.elapsed() < Duration::from_secs(3));
936        assert_eq!(Arc::strong_count(&drop_detector), 1);
937    }
938
939    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
940    async fn rpc_call_macro_test() {
941        let (server_key, mut server) =
942            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
943                match message.kind {
944                    _ => Some(IpcProtocolMessage {
945                        kind: IpcProtocolMessageKind::ServerTestReply,
946                    }),
947                }
948            })
949            .await
950            .unwrap();
951        let mut client = client::IpcRpcClient::initialize_client(
952            server_key,
953            |message: IpcProtocolMessage| async move {
954                match message.kind {
955                    _ => Some(IpcProtocolMessage {
956                        kind: IpcProtocolMessageKind::ClientTestReply,
957                    }),
958                }
959            },
960        )
961        .await
962        .unwrap();
963        server.schema_validated().await.unwrap().assert_success();
964        client.schema_validated().await.unwrap().assert_success();
965        rpc_call!(
966            sender: server,
967            to_send: IpcProtocolMessage {
968                kind: IpcProtocolMessageKind::TestMessage
969            },
970            receiver: IpcProtocolMessage {
971                kind: IpcProtocolMessageKind::ClientTestReply
972            } => {
973
974            },
975        )
976        .unwrap();
977        rpc_call!(
978            sender: client,
979            to_send: IpcProtocolMessage {
980                kind: IpcProtocolMessageKind::TestMessage
981            },
982            receiver: IpcProtocolMessage {
983                kind: IpcProtocolMessageKind::ServerTestReply
984            } => {
985
986            },
987        )
988        .unwrap();
989    }
990}