Skip to main content

ipc_rpc/
lib.rs

1//! Inter-Process Communication Remote Procedure Calls
2//!
3//! [![Crates.io](https://img.shields.io/crates/v/ipc-rpc.svg)](https://crates.io/crates/ipc-rpc/)
4//!
5//! Minimum Supported Rust Version: 1.61
6//!
7//! This Rust library is a wrapper over [`servo/ipc-channel`](https://github.com/servo/ipc-channel) that adds many new features.
8//!
9//! - Bi-directional communication by default.
10//! - [Future](https://doc.rust-lang.org/stable/std/future/trait.Future.html) based reply mechanism allows easy and accurate pairing of requests and responses.
11//! - (Optional, enabled by default) Validation on startup that message schemas match between the server and client. No more debugging bad deserializations. Relies on [`schemars`](https://crates.io/crates/schemars).
12//! - Streamlined initialization of IPC channels for common use cases, while still allowing for more flexible and robust dynamic initialization in more unique scenarios.
13//!
14//! Compatible with anything that can run `servo/ipc-channel`, which at time of writing includes
15//!
16//! - Windows
17//! - MacOS
18//! - Linux
19//! - FreeBSD
20//! - OpenBSD
21//!
22//! Additionally, `servo/ipc-channel` supports the following platforms but only while in `inprocess` mode, which is not capable of communication between processes.
23//!
24//! - Android
25//! - iOS
26//! - WASI
27//!
28//! ## tokio
29//!
30//! This crate uses the [`tokio`](https://crates.io/crates/tokio) runtime for executing futures, and it is a hard requirement that users of `ipc-rpc` must use `tokio`. There are no plans to add support for other
31//! executors, but sufficient demand for other executors may change that.
32//!
33//! # Cargo features
34//!
35//! This crate exposes one feature, `message-schema-validation`, which is on by default. This enables functionality related to the [`schemars`](https://crates.io/crates/schemars) crate.
36//! When enabled, the software will attempt to validate the user message schema on initialization of the connection. Failure to validate is not a critical failure, and won't crash the program.
37//! An error will be emitted in the logs, and this status can be retrieved programmatically via many functions, all called `schema_validated()`.
38//!
39//! If you decide that a failure to validate the schema should be a critical failure you can add the following line of code to your program for execution after a connection is established.
40//!
41//! ### Server
42//! ```no_run
43//! # async {
44//! # let (_key, mut server) = ipc_rpc::IpcRpcServer::initialize_server(|_| async {Option::<()>::None}).await.unwrap();
45//! server.schema_validated().await.unwrap().assert_success();
46//! # };
47//! ```
48//!
49//! ### Client
50//! ```no_run
51//! # async {
52//! # use ipc_rpc::{IpcRpcClient, ConnectionKey};
53//! # let mut client = IpcRpcClient::initialize_client(ConnectionKey::from(String::new()), |_| async {Option::<()>::None}).await.unwrap();
54//! client.schema_validated().await.unwrap().assert_success();
55//! # };
56//! ```
57//!
58//! # Limitations
59//!
60//! Much like `servo/ipc-channel`, servers may only serve one client. Overcoming this limitation would require work within `servo/ipc-channel`.
61
62use std::{
63    collections::HashMap,
64    convert::Infallible,
65    env,
66    ffi::OsString,
67    fmt::{self, Debug, Display},
68    future::Future,
69    io,
70    path::Path,
71    pin::Pin,
72    str::FromStr,
73    sync::Arc,
74    task::{Context, Poll},
75};
76
77use futures::{Stream, StreamExt};
78use ipc_channel::{
79    ipc::{IpcReceiver, IpcSender},
80    IpcError, TryRecvError,
81};
82use serde::{de::DeserializeOwned, Deserialize, Serialize};
83use thiserror::Error;
84use tokio::{
85    sync::{mpsc, watch},
86    time::{Duration, Instant},
87};
88use uuid::Uuid;
89
90#[cfg(feature = "message-schema-validation")]
91use schemars::{schema_for, JsonSchema, Schema};
92
93mod client;
94pub use client::*;
95mod server;
96pub use server::*;
97
98/// This key can be used to connect to the IPC server it came with, even outside of this process.
99#[derive(Deserialize, Serialize, Debug, Clone)]
100pub struct ConnectionKey(String);
101
102impl From<String> for ConnectionKey {
103    fn from(s: String) -> Self {
104        Self(s)
105    }
106}
107
108impl FromStr for ConnectionKey {
109    type Err = Infallible;
110
111    fn from_str(s: &str) -> Result<Self, Self::Err> {
112        Ok(Self(s.to_string()))
113    }
114}
115
116impl Display for ConnectionKey {
117    fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
118        write!(f, "{}", self.0)
119    }
120}
121
122impl From<ConnectionKey> for OsString {
123    fn from(s: ConnectionKey) -> Self {
124        OsString::from(s.0)
125    }
126}
127
128impl From<ConnectionKey> for String {
129    fn from(key: ConnectionKey) -> Self {
130        key.0
131    }
132}
133
134/// The pending reply box maintains a list of messages that are awaiting a reply. Messages are
135/// paired via their uuids, and delivered via the tokio mpsc framework to a future awaiting
136/// the reply. If a reply never arrives eventually a spawned Future responsible for maintenance
137/// of this mail box will clean them out and resolve the futures with a time out.
138type PendingReplyEntry<U> = (
139    Uuid,
140    (
141        mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
142        Instant,
143    ),
144);
145/// Internal protocol message structure. Wraps the actual user message with some structure
146/// helpful to the mail delivery system. You could liken this to a letter's envelope.
147#[derive(Deserialize, Serialize, Debug, Clone)]
148#[serde(bound(deserialize = ""))]
149struct InternalMessage<U: UserMessage> {
150    /// An identifier used for pairing messages with responses. A response to a message should
151    /// carry the same UUID the message itself had. Otherwise, the UUID should be unique.
152    uuid: uuid::Uuid,
153    /// The actual contents of the message.
154    kind: InternalMessageKind<U>,
155}
156
157/// There are many reasons we might need to send a message. This enumerates them. The enum also
158/// contains some messages intended for internal use that downstream users shouldn't concern
159/// themselves with.
160#[derive(Deserialize, Serialize, Debug, Clone)]
161#[serde(bound(deserialize = ""))]
162enum InternalMessageKind<U: UserMessage> {
163    /// Initialize connection. Includes an IpcSender which can be used to communicate with the client.
164    InitConnection(IpcSender<InternalMessage<U>>),
165    /// The conversation has come to a close, it's time to shut down.
166    Hangup,
167    /// Downstream has a message to exchange.
168    UserMessage(U),
169    /// Describes the user message schema for validation purposes
170    UserMessageSchema(String),
171    /// Returned if the user message schema matched
172    UserMessageSchemaOk,
173    /// Returned if the user message schema did not match
174    UserMessageSchemaError { other_schema: String },
175}
176
177/// Errors which can occur with ipc-rpc during operation.
178#[derive(Clone, Debug, Error)]
179pub enum IpcRpcError {
180    #[error("io error")]
181    IoError(#[from] Arc<io::Error>),
182    #[error("internal ipc channel error")]
183    IpcChannelError(#[from] Arc<IpcError>),
184    #[error("connection initialization timed out")]
185    ConnectTimeout,
186    #[error("connection established, but initial handshake was not performed properly")]
187    HandshakeFailure,
188    #[error("client already connected")]
189    ClientAlreadyConnected,
190    #[error("peer disconnected")]
191    Disconnected,
192    #[error("time out while waiting for a reply")]
193    ReplyTimeout,
194    #[error("connection dropped pre-emptively")]
195    ConnectionDropped,
196}
197
198impl From<io::Error> for IpcRpcError {
199    fn from(e: io::Error) -> Self {
200        Self::IoError(Arc::new(e))
201    }
202}
203
204impl From<IpcError> for IpcRpcError {
205    fn from(e: IpcError) -> Self {
206        Self::IpcChannelError(Arc::new(e))
207    }
208}
209
210/// The default timeout used for `send()` style methods. Use `send_timeout()` to use something else.
211///
212/// This may change in the future. The current value is
213/// ```
214/// # use tokio::time::Duration;
215/// # assert_eq!(
216/// # ipc_rpc::DEFAULT_REPLY_TIMEOUT,
217/// Duration::from_secs(5)
218/// # );
219/// ```
220pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
221
222/// Processes incoming messages for both the client and the server. Responsible for distributing
223/// and generating replies.
224async fn process_incoming_mail<
225    Fut: Future<Output = Option<U>> + Send,
226    F: Fn(U) -> Fut + Send + Sync + 'static,
227    U: UserMessage,
228>(
229    is_server: bool,
230    mut pending_reply_receiver: mpsc::UnboundedReceiver<PendingReplyEntry<U>>,
231    mut receiver: IpcReceiveStream<InternalMessage<U>>,
232    message_handler: F,
233    response_sender: IpcSender<InternalMessage<U>>,
234    status_sender: watch::Sender<ConnectionStatus>,
235) {
236    let mut pending_replies = HashMap::<
237        Uuid,
238        (
239            mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
240            Instant,
241        ),
242    >::new();
243    let message_handler = Arc::new(message_handler);
244    let log_prefix = get_log_prefix(is_server);
245    log::info!("{}Processing incoming mail!", log_prefix);
246    let mut consecutive_error_count = 0;
247    let mut pending_reply_scheduled_time = Option::<Instant>::None;
248
249    let add_pending_reply = |pending_reply_scheduled_time: &mut Option<Instant>,
250                             pending_replies: &mut HashMap<_, _>,
251                             (key, (sender, timeout))| {
252        *pending_reply_scheduled_time = Some(
253            pending_reply_scheduled_time
254                .map(|t| t.min(timeout))
255                .unwrap_or(timeout),
256        );
257
258        pending_replies.insert(key, (sender, timeout));
259    };
260    loop {
261        // Empty out the pending reply receiver before entering the select below. This guarantees
262        // that all queued reply drop boxes are processed before we start receiving incoming mail.
263        while let Ok(pending_reply) = pending_reply_receiver.try_recv() {
264            add_pending_reply(
265                &mut pending_reply_scheduled_time,
266                &mut pending_replies,
267                pending_reply,
268            );
269        }
270        tokio::select! {
271            true = async { if let Some(t) = pending_reply_scheduled_time { tokio::time::sleep_until(t).await; true } else { false } } => {
272                pending_replies.retain(|_k, v| {
273                    let keep = v.1 > Instant::now();
274                    if !keep {
275                        let _ = v.0.send(Err(IpcRpcError::ReplyTimeout));
276                    }
277                    keep
278                });
279                pending_reply_scheduled_time = pending_replies.values().map(|i| i.1).min();
280            }
281            pending_reply = pending_reply_receiver.recv() => {
282                match pending_reply {
283                    None => {
284                        // Sender got dropped, time to close up shop.
285                        break;
286                    }
287                    Some(pending_reply) => {
288                        add_pending_reply(&mut pending_reply_scheduled_time, &mut pending_replies, pending_reply);
289                    }
290                }
291            },
292            r = receiver.next() => {
293                match r {
294                    None => {
295                        // incoming mail stream ended, time to shut down.
296                        break;
297                    }
298                    Some(Err(e)) => {
299                        if let IpcError::Disconnected = e {
300                            log::info!("{}Peer disconnected.", log_prefix);
301                            break;
302                        } else {
303                            log::error!("{}Error receiving message from peer {:?}", log_prefix, e);
304                            consecutive_error_count += 1;
305                            if consecutive_error_count > 20 {
306                                log::error!("{}Too many consecutive errors, shutting down.", log_prefix);
307                                break;
308                            }
309                        }
310                    }
311                    Some(Ok(message)) => {
312                        consecutive_error_count = 0;
313                        log::debug!("{}Got message! {:?}", log_prefix, message);
314                        let reply = pending_replies.remove(&message.uuid);
315                        if let Some((reply_drop_box, _)) = reply {
316                            log::debug!("{}It's a reply, forwarding!", log_prefix);
317                            // If the end user doesn't want this message that's fine.
318                            let _ = reply_drop_box.send(Ok(message.kind));
319                        } else {
320                            log::debug!("{}It's not a reply, handling!", log_prefix);
321                            let message_uuid = message.uuid;
322                            match message.kind {
323                                InternalMessageKind::UserMessage(user_message) => {
324                                    let message_handler = Arc::clone(&message_handler);
325                                    let response_sender = response_sender.clone();
326                                    tokio::spawn(async move {
327                                        if let Some(m) = message_handler(user_message).await {
328                                            let r = response_sender.send(InternalMessage {
329                                                uuid: message_uuid,
330                                                kind: InternalMessageKind::UserMessage(m),
331                                            });
332                                            if let Err(e) = r {
333                                                log::error!("Failed to send reply {e:?}");
334                                            }
335                                        }
336                                    });
337                                }
338                                #[cfg(feature = "message-schema-validation")]
339                                InternalMessageKind::UserMessageSchema(other_schema) => {
340                                    let my_schema = schema_for!(U);
341                                    let kind = match serde_json::from_str::<Schema>(&other_schema) {
342                                        Ok(other_schema) => {
343                                            if other_schema == my_schema {
344                                                InternalMessageKind::UserMessageSchemaOk
345                                            } else {
346                                                InternalMessageKind::UserMessageSchemaError {
347                                                    other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
348                                                }
349                                            }
350                                        },
351                                        Err(_) => {
352                                            log::error!("Failed to deserialize incoming schema properly, got {other_schema:?}");
353                                            InternalMessageKind::UserMessageSchemaError {
354                                                other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
355                                            }
356                                        }
357                                    };
358                                    let r = response_sender.send(InternalMessage {
359                                        uuid: message_uuid,
360                                        kind,
361                                    });
362                                    if let Err(e) = r {
363                                        log::error!("Failed to send validation response {e:#?}");
364                                    }
365                                }
366                                InternalMessageKind::Hangup => {
367                                    break;
368                                }
369                                _ => {}
370                            }
371                        }
372                    }
373                }
374            }
375        }
376    }
377    let _ = status_sender.send(ConnectionStatus::DisconnectedCleanly);
378}
379
380fn get_log_prefix(is_server: bool) -> String {
381    let first_arg = env::args()
382        .next()
383        .unwrap_or_else(|| String::from("Unknown"));
384    let process = Path::new(&first_arg)
385        .file_name()
386        .unwrap_or_else(|| "Unknown".as_ref())
387        .to_string_lossy();
388    if is_server {
389        format!("{} as Server: ", process)
390    } else {
391        format!("{} as Client: ", process)
392    }
393}
394
395/// Reports the status of the connection between the server and the client
396#[derive(Clone, Debug)]
397pub enum ConnectionStatus {
398    /// The server is waiting for a client to connect. A client never waits for a server to connect.
399    WaitingForClient,
400    /// The connection is active.
401    Connected,
402    /// A shutdown happened, and this was normal. Nothing unexpected happened.
403    DisconnectedCleanly,
404    /// An unexpected shutdown occurred, error contained within.
405    DisconnectError(IpcRpcError),
406}
407
408impl ConnectionStatus {
409    pub fn session_end_result(&self) -> Option<Result<(), IpcRpcError>> {
410        match self {
411            ConnectionStatus::WaitingForClient | ConnectionStatus::Connected => None,
412            ConnectionStatus::DisconnectedCleanly => Some(Ok(())),
413            ConnectionStatus::DisconnectError(e) => Some(Err(e.clone())),
414        }
415    }
416}
417
418/// This future represents a reply to a previous message. It is not a public type. This is to permit
419/// future flexibility in how this interface is implemented.
420struct IpcReplyFuture<U: UserMessage> {
421    receiver: mpsc::UnboundedReceiver<Result<InternalMessageKind<U>, IpcRpcError>>,
422}
423
424impl<U: UserMessage> Future for IpcReplyFuture<U> {
425    type Output = Result<InternalMessageKind<U>, IpcRpcError>;
426
427    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
428        self.receiver.poll_recv(cx).map(|o| match o {
429            Some(m) => m,
430            None => Err(IpcRpcError::ConnectionDropped),
431        })
432    }
433}
434
435struct IpcReceiveStream<T> {
436    receiver: mpsc::UnboundedReceiver<Result<T, IpcError>>,
437}
438
439impl<T> IpcReceiveStream<T>
440where
441    T: Send + for<'de> Deserialize<'de> + Serialize + 'static,
442{
443    pub fn new(ipc_receiver: IpcReceiver<T>) -> Self {
444        let (sender, receiver) = mpsc::unbounded_channel();
445        tokio::task::spawn_blocking(move || loop {
446            match ipc_receiver.try_recv_timeout(Duration::from_millis(250)) {
447                Ok(msg) => {
448                    if sender.send(Ok(msg)).is_err() {
449                        break;
450                    }
451                }
452                Err(TryRecvError::IpcError(e)) => {
453                    if sender.send(Err(e)).is_err() {
454                        break;
455                    }
456                }
457                Err(TryRecvError::Empty) => {
458                    if sender.is_closed() {
459                        break;
460                    }
461                }
462            }
463        });
464        Self { receiver }
465    }
466}
467
468impl<T> Stream for IpcReceiveStream<T> {
469    type Item = Result<T, IpcError>;
470
471    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
472        self.receiver.poll_recv(cx)
473    }
474}
475
476/// Reports the outcome of automatic schema validation testing. This testing is performed
477/// on connection initiation.
478#[derive(Debug, Clone)]
479pub enum SchemaValidationStatus {
480    /// Crate was compiled without default features, so validation does not function.
481    ValidationDisabledAtCompileTime,
482    /// Internal error
483    ValidationNotPerformedProperly,
484    /// Internal error
485    ValidationCommunicationFailed(IpcRpcError),
486    /// Schemas matched, all clear
487    SchemasMatched,
488    /// Schemas didn't match, schemas provided for comparison
489    SchemaMismatch {
490        our_schema: String,
491        their_schema: String,
492    },
493}
494
495impl SchemaValidationStatus {
496    /// Returns true if this is an instance of [`Self::SchemasMatched`], and false otherwise.
497    pub fn is_success(&self) -> bool {
498        matches!(self, SchemaValidationStatus::SchemasMatched)
499    }
500
501    /// Panics if this status is not [`Self::SchemasMatched`], and prints the debug information into the panic.
502    pub fn assert_success(&self) {
503        if !self.is_success() {
504            panic!("ipc-rpc user message schema failed to validate, error {self:#?}");
505        }
506    }
507}
508
509/// A combination trait which represents all of the traits needed for a type to be
510/// used as a message with this library.
511///
512/// # Note on `JsonSchema`
513/// `JsonSchema` is a trait from the [`schemars`](https://crates.io/crates/schemars) crate.
514/// Deriving it for your message allows `ipc-rpc` to perform validation on message schemas
515/// after communication has started. The results of this validation are logged, and stored in
516/// [IpcRpcClient::schema_validated], [IpcRpcServer::schema_validated], and [IpcRpc::schema_validated].
517/// A failed validation will not crash the program.
518///
519/// Despite what this may imply, `ipc-rpc` does not use JSON for messages internally.
520///
521/// If you find yourself unable to derive `JsonSchema` then consider turning off
522/// default features for the `ipc-rpc` crate. Once you do, the `JsonSchema` requirement
523/// will go away.
524#[cfg(feature = "message-schema-validation")]
525pub trait UserMessage:
526    'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
527{
528}
529
530#[cfg(feature = "message-schema-validation")]
531impl<T> UserMessage for T where
532    T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
533{
534}
535
536/// A combination trait which represents all of the traits needed for a type to be
537/// used as a message with this library.
538#[cfg(not(feature = "message-schema-validation"))]
539pub trait UserMessage: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
540
541#[cfg(not(feature = "message-schema-validation"))]
542impl<T> UserMessage for T where T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
543
544/// Invokes an RPC call within the current async runtime.
545///
546/// # Params
547/// - `sender`: A reference to an [IpcRpcServer], [IpcRpcClient], or [IpcRpc]
548/// - `to_send`: The message sent to the remote
549/// - `receiver`: The expected response pattern, followed by a handler for it.
550///
551/// # Returns
552/// The value returned by `receiver`.
553///
554/// # Panics
555/// Panics if the message received from the remote doesn't match the pattern specified
556/// in receiver.
557///
558/// # Example
559///
560/// ```no_run
561/// use serde::{Deserialize, Serialize};
562/// use schemars::JsonSchema;
563/// use std::fmt::Debug;
564///
565/// #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
566/// enum Message {
567///     MakeMeASandwich,
568///     /// The sandwiches are made of i32 around here, don't judge.
569///     ASandwich(Vec<i32>),
570/// }
571///
572/// // Initialize a client
573///
574/// # async {
575/// # use ipc_rpc::{IpcRpcClient, ConnectionKey, rpc_call};
576/// # let mut client = IpcRpcClient::initialize_client(ConnectionKey::from(String::new()), |_| async {Option::<Message>::None}).await.unwrap();
577/// rpc_call!(
578///     sender: client,
579///     to_send: Message::MakeMeASandwich,
580///     receiver: Message::ASandwich(parts) => {
581///         log::info!("I got a sandwich! It contains\n{parts:#?}");
582///     },
583/// )
584/// .unwrap()
585/// # };
586/// ```
587#[macro_export]
588macro_rules! rpc_call {
589    (sender: $sender:expr, to_send: $to_send:expr, receiver: $received:pat_param => $to_do:block,) => {
590        $sender.send($to_send).await.map(|m| match m {
591            $received => $to_do,
592            _ => panic!("rpc_call response didn't match given pattern"),
593        })
594    };
595}
596
597#[cfg(test)]
598mod tests {
599    use tokio::time::timeout;
600
601    use super::*;
602
603    #[cfg(not(feature = "message-schema-validation"))]
604    compile_error!("Tests must be executed with all features on");
605
606    #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
607    pub struct IpcProtocolMessage {
608        pub kind: IpcProtocolMessageKind,
609    }
610
611    #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
612    pub enum IpcProtocolMessageKind {
613        TestMessage,
614        ClientTestReply,
615        ServerTestReply,
616    }
617
618    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
619    async fn basic_dialogue() {
620        let (server_key, mut server) =
621            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
622                match message.kind {
623                    IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
624                        kind: IpcProtocolMessageKind::ServerTestReply,
625                    }),
626                    _ => None,
627                }
628            })
629            .await
630            .unwrap();
631        let mut client = client::IpcRpcClient::initialize_client(
632            server_key,
633            |message: IpcProtocolMessage| async move {
634                match message.kind {
635                    IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
636                        kind: IpcProtocolMessageKind::ClientTestReply,
637                    }),
638                    _ => None,
639                }
640            },
641        )
642        .await
643        .unwrap();
644        server.schema_validated().await.unwrap().assert_success();
645        client.schema_validated().await.unwrap().assert_success();
646        let client_reply = server
647            .send(IpcProtocolMessage {
648                kind: IpcProtocolMessageKind::TestMessage,
649            })
650            .await;
651        if !matches!(
652            client_reply.as_ref().map(|r| &r.kind),
653            Ok(IpcProtocolMessageKind::ClientTestReply)
654        ) {
655            panic!("client reply was of unexpected type: {:?}", client_reply);
656        }
657        let server_reply = client
658            .send(IpcProtocolMessage {
659                kind: IpcProtocolMessageKind::TestMessage,
660            })
661            .await;
662        if !matches!(
663            server_reply.as_ref().map(|r| &r.kind),
664            Ok(IpcProtocolMessageKind::ServerTestReply)
665        ) {
666            panic!("server reply was of unexpected type: {:?}", server_reply);
667        }
668    }
669
670    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
671    async fn send_without_await() {
672        let (server_success_sender, mut server_success_receiver) = mpsc::unbounded_channel();
673        let (client_success_sender, mut client_success_receiver) = mpsc::unbounded_channel();
674        let (server_key, mut server) =
675            server::IpcRpcServer::initialize_server(move |message: IpcProtocolMessage| {
676                let server_success_sender = server_success_sender.clone();
677                async move {
678                    match message.kind {
679                        IpcProtocolMessageKind::TestMessage => {
680                            server_success_sender.send(()).unwrap()
681                        }
682                        _ => {}
683                    }
684                    None
685                }
686            })
687            .await
688            .unwrap();
689        let mut client = client::IpcRpcClient::initialize_client(
690            server_key,
691            move |message: IpcProtocolMessage| {
692                let client_success_sender = client_success_sender.clone();
693                async move {
694                    match message.kind {
695                        IpcProtocolMessageKind::TestMessage => {
696                            client_success_sender.send(()).unwrap()
697                        }
698                        _ => {}
699                    }
700                    None
701                }
702            },
703        )
704        .await
705        .unwrap();
706        server.schema_validated().await.unwrap().assert_success();
707        client.schema_validated().await.unwrap().assert_success();
708        let _ = server.send(IpcProtocolMessage {
709            kind: IpcProtocolMessageKind::TestMessage,
710        });
711        let _ = client.send(IpcProtocolMessage {
712            kind: IpcProtocolMessageKind::TestMessage,
713        });
714        assert_eq!(
715            timeout(Duration::from_secs(3), server_success_receiver.recv()).await,
716            Ok(Some(()))
717        );
718        assert_eq!(
719            timeout(Duration::from_secs(3), client_success_receiver.recv()).await,
720            Ok(Some(()))
721        );
722    }
723
724    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
725    async fn timeout_test() {
726        let (server_key, mut server) =
727            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
728                match message.kind {
729                    _ => None,
730                }
731            })
732            .await
733            .unwrap();
734        let mut client = client::IpcRpcClient::initialize_client(
735            server_key,
736            |message: IpcProtocolMessage| async move {
737                match message.kind {
738                    _ => None,
739                }
740            },
741        )
742        .await
743        .unwrap();
744        server.schema_validated().await.unwrap().assert_success();
745        client.schema_validated().await.unwrap().assert_success();
746        let wait_start = Instant::now();
747        let client_reply = server
748            .send(IpcProtocolMessage {
749                kind: IpcProtocolMessageKind::TestMessage,
750            })
751            .await;
752        assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
753        if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
754            panic!("client reply was of unexpected type: {:?}", client_reply);
755        }
756        let wait_start = Instant::now();
757        let server_reply = client
758            .send(IpcProtocolMessage {
759                kind: IpcProtocolMessageKind::TestMessage,
760            })
761            .await;
762        assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
763        if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
764            panic!("server reply was of unexpected type: {:?}", server_reply);
765        }
766    }
767
768    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
769    async fn custom_timeout_test() {
770        let (server_key, mut server) =
771            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
772                match message.kind {
773                    _ => None,
774                }
775            })
776            .await
777            .unwrap();
778        let mut client = client::IpcRpcClient::initialize_client(
779            server_key,
780            |message: IpcProtocolMessage| async move {
781                match message.kind {
782                    _ => None,
783                }
784            },
785        )
786        .await
787        .unwrap();
788        server.schema_validated().await.unwrap().assert_success();
789        client.schema_validated().await.unwrap().assert_success();
790        let custom_timeout: Duration = DEFAULT_REPLY_TIMEOUT / 2;
791        let wait_start = Instant::now();
792        let client_reply = server
793            .send_timeout(
794                IpcProtocolMessage {
795                    kind: IpcProtocolMessageKind::TestMessage,
796                },
797                custom_timeout,
798            )
799            .await;
800        assert!(wait_start.elapsed() >= custom_timeout);
801        assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
802        if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
803            panic!("client reply was of unexpected type: {:?}", client_reply);
804        }
805        let wait_start = Instant::now();
806        let server_reply = client
807            .send_timeout(
808                IpcProtocolMessage {
809                    kind: IpcProtocolMessageKind::TestMessage,
810                },
811                custom_timeout,
812            )
813            .await;
814        assert!(wait_start.elapsed() >= custom_timeout);
815        assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
816        if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
817            panic!("server reply was of unexpected type: {:?}", server_reply);
818        }
819    }
820    #[test_log::test]
821    fn server_drop_does_not_hang() {
822        let thread = std::thread::spawn(|| {
823            let runtime = tokio::runtime::Runtime::new().unwrap();
824            runtime.block_on(async {
825                let (_server_key, _server) = server::IpcRpcServer::initialize_server(
826                    |message: IpcProtocolMessage| async move {
827                        match message.kind {
828                            _ => None,
829                        }
830                    },
831                )
832                .await
833                .unwrap();
834            })
835        });
836
837        let start = Instant::now();
838        let timeout = Duration::from_secs(5);
839        while !thread.is_finished() {
840            if start.elapsed() >= timeout {
841                // Server drop is hanging, force quit with non-zero
842                std::process::exit(1);
843            }
844        }
845    }
846
847    // This test checks to see if a task has come to an end, so it must wait for the runtime to drop.
848    // Therefore tokio::test is not an option.
849    #[test_log::test]
850    fn server_disconnect_test() {
851        // We use an empty Arc as a resource inside the message handler to prove the message handler
852        // was dropped. If the message handler was dropped then the cleanup routines were executed.
853        let drop_detector = Arc::new(());
854        let drop_detector_clone = drop_detector.clone();
855        let runtime = tokio::runtime::Runtime::new().unwrap();
856        runtime.block_on(async move {
857            let (server_key, server) = server::IpcRpcServer::initialize_server({
858                let drop_detector_clone = drop_detector_clone.clone();
859                move |message: IpcProtocolMessage| {
860                    let drop_detector_clone = drop_detector_clone.clone();
861                    async move {
862                        match message.kind {
863                            _ => {
864                                // I don't expect this to ever be called, I just need the closure to take
865                                // ownership of the Arc.
866                                let _ = drop_detector_clone.clone();
867                                None
868                            }
869                        }
870                    }
871                }
872            })
873            .await
874            .unwrap();
875            let client = client::IpcRpcClient::initialize_client(
876                server_key,
877                |message: IpcProtocolMessage| async move {
878                    match message.kind {
879                        _ => None,
880                    }
881                },
882            )
883            .await
884            .unwrap();
885            assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
886            drop(server);
887            client.wait_for_server_to_disconnect().await.unwrap();
888        });
889        // It's important that the runtime be able to shut down quickly, so we'll assert it
890        // shut down within 3 seconds.
891        let start_shutdown = Instant::now();
892        runtime.shutdown_timeout(Duration::from_secs(5));
893        assert!(start_shutdown.elapsed() < Duration::from_secs(3));
894        assert_eq!(Arc::strong_count(&drop_detector), 1);
895    }
896
897    // This test checks to see if a task has come to an end, so it must wait for the runtime to drop.
898    // Therefore tokio::test is not an option.
899    #[test_log::test]
900    fn client_disconnect_test() {
901        use std::time::Instant;
902
903        // We use an empty Arc as a resource inside the message handler to prove the message handler
904        // was dropped. If the message handler was dropped then the cleanup routines were executed.
905        let drop_detector = Arc::new(());
906        let drop_detector_clone = drop_detector.clone();
907        let runtime = tokio::runtime::Runtime::new().unwrap();
908        runtime.block_on(async move {
909            let (server_key, mut server) =
910                server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
911                    match message.kind {
912                        _ => None,
913                    }
914                })
915                .await
916                .unwrap();
917            let client = client::IpcRpcClient::initialize_client(server_key, {
918                let drop_detector_clone = drop_detector_clone.clone();
919                move |message: IpcProtocolMessage| {
920                    let _drop_detector_clone = drop_detector_clone.clone();
921                    async move {
922                        match message.kind {
923                            _ => None,
924                        }
925                    }
926                }
927            })
928            .await
929            .unwrap();
930            assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
931            drop(client);
932            server.wait_for_client_to_disconnect().await.unwrap();
933        });
934        // It's important that the runtime be able to shut down quickly, so we'll assert it
935        // shut down within 3 seconds.
936        let start_shutdown = Instant::now();
937        runtime.shutdown_timeout(Duration::from_secs(5));
938        assert!(start_shutdown.elapsed() < Duration::from_secs(3));
939        assert_eq!(Arc::strong_count(&drop_detector), 1);
940    }
941
942    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
943    async fn rpc_call_macro_test() {
944        let (server_key, mut server) =
945            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
946                match message.kind {
947                    _ => Some(IpcProtocolMessage {
948                        kind: IpcProtocolMessageKind::ServerTestReply,
949                    }),
950                }
951            })
952            .await
953            .unwrap();
954        let mut client = client::IpcRpcClient::initialize_client(
955            server_key,
956            |message: IpcProtocolMessage| async move {
957                match message.kind {
958                    _ => Some(IpcProtocolMessage {
959                        kind: IpcProtocolMessageKind::ClientTestReply,
960                    }),
961                }
962            },
963        )
964        .await
965        .unwrap();
966        server.schema_validated().await.unwrap().assert_success();
967        client.schema_validated().await.unwrap().assert_success();
968        rpc_call!(
969            sender: server,
970            to_send: IpcProtocolMessage {
971                kind: IpcProtocolMessageKind::TestMessage
972            },
973            receiver: IpcProtocolMessage {
974                kind: IpcProtocolMessageKind::ClientTestReply
975            } => {
976
977            },
978        )
979        .unwrap();
980        rpc_call!(
981            sender: client,
982            to_send: IpcProtocolMessage {
983                kind: IpcProtocolMessageKind::TestMessage
984            },
985            receiver: IpcProtocolMessage {
986                kind: IpcProtocolMessageKind::ServerTestReply
987            } => {
988
989            },
990        )
991        .unwrap();
992    }
993}