ipc_rpc/
lib.rs

1//! Inter-Process Communication Remote Procedure Calls
2//!
3//! [![Crates.io](https://img.shields.io/crates/v/ipc-rpc.svg)](https://crates.io/crates/ipc-rpc/)
4//!
5//! This Rust library is a wrapper over [`servo/ipc-channel`](https://github.com/servo/ipc-channel) that adds many new features.
6//!
7//! - Bi-directional communication by default.
8//! - [Future](https://doc.rust-lang.org/stable/std/future/trait.Future.html) based reply mechanism allows easy and accurate pairing of requests and responses.
9//! - (Optional, enabled by default) Validation on startup that message schemas match between the server and client. No more debugging bad deserializations. Relies on [`schemars`](https://crates.io/crates/schemars).
10//! - Streamlined initialization of IPC channels for common use cases, while still allowing for more flexible and robust dynamic initialization in more unique scenarios.
11//!
12//! Compatible with anything that can run `servo/ipc-channel`, which at time of writing includes
13//!
14//! - Windows
15//! - MacOS
16//! - Linux
17//! - FreeBSD
18//! - OpenBSD
19//!
20//! Additionally, `servo/ipc-channel` supports the following platforms but only while in `inprocess` mode, which is not capable of communication between processes.
21//!
22//! - Android
23//! - iOS
24//! - WASI
25//!
26//! ## tokio
27//!
28//! This crate uses the [`tokio`](https://crates.io/crates/tokio) runtime for executing futures, and it is a hard requirement that users of `ipc-rpc` must use `tokio`. There are no plans to add support for other
29//! executors, but sufficient demand for other executors may change that.
30//!
31//! # Cargo features
32//!
33//! This crate exposes one feature, `message-schema-validation`, which is on by default. This enables functionality related to the [`schemars`](https://crates.io/crates/schemars) crate.
34//! When enabled, the software will attempt to validate the user message schema on initialization of the connection. Failure to validate is not a critical failure, and won't crash the program.
35//! An error will be emitted in the logs, and this status can be retrieved programmatically via many functions, all called `schema_validated()`.
36//!
37//! If you decide that a failure to validate the schema should be a critical failure you can add the following line of code to your program for execution after a connection is established.
38//!
39//! ### Server
40//! ```no_run
41//! # async {
42//! # let (_key, mut server) = ipc_rpc::IpcRpcServer::initialize_server(|_| async {Option::<()>::None}).await.unwrap();
43//! server.schema_validated().await.unwrap().assert_success();
44//! # };
45//! ```
46//!
47//! ### Client
48//! ```no_run
49//! # async {
50//! # use ipc_rpc::{IpcRpcClient, ConnectionKey};
51//! # let mut client = IpcRpcClient::initialize_client(ConnectionKey::from(String::new()), |_| async {Option::<()>::None}).await.unwrap();
52//! client.schema_validated().await.unwrap().assert_success();
53//! # };
54//! ```
55//!
56//! # Limitations
57//!
58//! Much like `servo/ipc-channel`, servers may only serve one client. Overcoming this limitation would require work within `servo/ipc-channel`.
59
60use std::{
61    collections::HashMap,
62    convert::Infallible,
63    env,
64    ffi::OsString,
65    fmt::Debug,
66    future::Future,
67    io,
68    path::Path,
69    pin::Pin,
70    str::FromStr,
71    sync::Arc,
72    task::{Context, Poll},
73};
74
75use futures::{Stream, StreamExt};
76use ipc_channel::ipc::{IpcError, IpcReceiver, IpcSender, TryRecvError};
77use serde::{de::DeserializeOwned, Deserialize, Serialize};
78use thiserror::Error;
79use tokio::{
80    sync::{mpsc, watch},
81    time::{Duration, Instant},
82};
83use uuid::Uuid;
84
85#[cfg(feature = "message-schema-validation")]
86use schemars::{schema::RootSchema, schema_for, JsonSchema};
87
88mod client;
89pub use client::*;
90mod server;
91pub use server::*;
92
93/// This key can be used to connect to the IPC server it came with, even outside of this process.
94#[derive(Deserialize, Serialize, Debug, Clone)]
95pub struct ConnectionKey(String);
96
97impl From<String> for ConnectionKey {
98    fn from(s: String) -> Self {
99        Self(s)
100    }
101}
102
103impl FromStr for ConnectionKey {
104    type Err = Infallible;
105
106    fn from_str(s: &str) -> Result<Self, Self::Err> {
107        Ok(Self(s.to_string()))
108    }
109}
110
111impl ToString for ConnectionKey {
112    fn to_string(&self) -> String {
113        self.0.clone()
114    }
115}
116
117impl From<ConnectionKey> for OsString {
118    fn from(s: ConnectionKey) -> Self {
119        OsString::from(s.0)
120    }
121}
122
123impl From<ConnectionKey> for String {
124    fn from(key: ConnectionKey) -> Self {
125        key.0
126    }
127}
128
129/// The pending reply box maintains a list of messages that are awaiting a reply. Messages are
130/// paired via their uuids, and delivered via the tokio mpsc framework to a future awaiting
131/// the reply. If a reply never arrives eventually a spawned Future responsible for maintenance
132/// of this mail box will clean them out and resolve the futures with a time out.
133type PendingReplyEntry<U> = (
134    Uuid,
135    (
136        mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
137        Instant,
138    ),
139);
140/// Internal protocol message structure. Wraps the actual user message with some structure
141/// helpful to the mail delivery system. You could liken this to a letter's envelope.
142#[derive(Deserialize, Serialize, Debug, Clone)]
143#[serde(bound(deserialize = ""))]
144struct InternalMessage<U: UserMessage> {
145    /// An identifier used for pairing messages with responses. A response to a message should
146    /// carry the same UUID the message itself had. Otherwise, the UUID should be unique.
147    uuid: uuid::Uuid,
148    /// The actual contents of the message.
149    kind: InternalMessageKind<U>,
150}
151
152/// There are many reasons we might need to send a message. This enumerates them. The enum also
153/// contains some messages intended for internal use that downstream users shouldn't concern
154/// themselves with.
155#[derive(Deserialize, Serialize, Debug, Clone)]
156#[serde(bound(deserialize = ""))]
157enum InternalMessageKind<U: UserMessage> {
158    /// Initialize connection. Includes an IpcSender which can be used to communicate with the client.
159    InitConnection(IpcSender<InternalMessage<U>>),
160    /// The conversation has come to a close, it's time to shut down.
161    Hangup,
162    /// Downstream has a message to exchange.
163    UserMessage(U),
164    /// Describes the user message schema for validation purposes
165    UserMessageSchema(String),
166    /// Returned if the user message schema matched
167    UserMessageSchemaOk,
168    /// Returned if the user message schema did not match
169    UserMessageSchemaError { other_schema: String },
170}
171
172/// Errors which can occur with ipc-rpc during operation.
173#[derive(Clone, Debug, Error)]
174pub enum IpcRpcError {
175    #[error("io error")]
176    IoError(#[from] Arc<io::Error>),
177    #[error("internal ipc channel error")]
178    IpcChannelError(#[from] Arc<ipc_channel::Error>),
179    #[error("connection initialization timed out")]
180    ConnectTimeout,
181    #[error("connection established, but initial handshake was not performed properly")]
182    HandshakeFailure,
183    #[error("client already connected")]
184    ClientAlreadyConnected,
185    #[error("peer disconnected")]
186    Disconnected,
187    #[error("time out while waiting for a reply")]
188    ReplyTimeout,
189    #[error("connection dropped pre-emptively")]
190    ConnectionDropped,
191}
192
193impl From<io::Error> for IpcRpcError {
194    fn from(e: io::Error) -> Self {
195        Self::IoError(Arc::new(e))
196    }
197}
198
199impl From<ipc_channel::Error> for IpcRpcError {
200    fn from(e: ipc_channel::Error) -> Self {
201        Self::IpcChannelError(Arc::new(e))
202    }
203}
204
205/// The default timeout used for `send()` style methods. Use `send_timeout()` to use something else.
206///
207/// This may change in the future. The current value is
208/// ```
209/// # use tokio::time::Duration;
210/// # assert_eq!(
211/// # ipc_rpc::DEFAULT_REPLY_TIMEOUT,
212/// Duration::from_secs(5)
213/// # );
214/// ```
215pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
216
217/// Processes incoming messages for both the client and the server. Responsible for distributing
218/// and generating replies.
219async fn process_incoming_mail<
220    Fut: Future<Output = Option<U>> + Send,
221    F: Fn(U) -> Fut + Send + Sync + 'static,
222    U: UserMessage,
223>(
224    is_server: bool,
225    mut pending_reply_receiver: mpsc::UnboundedReceiver<PendingReplyEntry<U>>,
226    mut receiver: IpcReceiveStream<InternalMessage<U>>,
227    message_handler: F,
228    response_sender: IpcSender<InternalMessage<U>>,
229    status_sender: watch::Sender<ConnectionStatus>,
230) {
231    let mut pending_replies = HashMap::<
232        Uuid,
233        (
234            mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
235            Instant,
236        ),
237    >::new();
238    let message_handler = Arc::new(message_handler);
239    let log_prefix = get_log_prefix(is_server);
240    log::info!("{}Processing incoming mail!", log_prefix);
241    let mut consecutive_error_count = 0;
242    let mut pending_reply_scheduled_time = Option::<Instant>::None;
243
244    let add_pending_reply = |pending_reply_scheduled_time: &mut Option<Instant>,
245                             pending_replies: &mut HashMap<_, _>,
246                             (key, (sender, timeout))| {
247        *pending_reply_scheduled_time = Some(
248            pending_reply_scheduled_time
249                .map(|t| t.min(timeout))
250                .unwrap_or(timeout),
251        );
252
253        pending_replies.insert(key, (sender, timeout));
254    };
255    loop {
256        // Empty out the pending reply receiver before entering the select below. This guarantees
257        // that all queued reply drop boxes are processed before we start receiving incoming mail.
258        while let Ok(pending_reply) = pending_reply_receiver.try_recv() {
259            add_pending_reply(
260                &mut pending_reply_scheduled_time,
261                &mut pending_replies,
262                pending_reply,
263            );
264        }
265        tokio::select! {
266            true = async { if let Some(t) = pending_reply_scheduled_time { tokio::time::sleep_until(t).await; true } else { false } } => {
267                pending_replies.retain(|_k, v| {
268                    let keep = v.1 > Instant::now();
269                    if !keep {
270                        let _ = v.0.send(Err(IpcRpcError::ReplyTimeout));
271                    }
272                    keep
273                });
274                pending_reply_scheduled_time = pending_replies.values().map(|i| i.1).min();
275            }
276            pending_reply = pending_reply_receiver.recv() => {
277                match pending_reply {
278                    None => {
279                        // Sender got dropped, time to close up shop.
280                        break;
281                    }
282                    Some(pending_reply) => {
283                        add_pending_reply(&mut pending_reply_scheduled_time, &mut pending_replies, pending_reply);
284                    }
285                }
286            },
287            r = receiver.next() => {
288                match r {
289                    None => {
290                        // incoming mail stream ended, time to shut down.
291                        break;
292                    }
293                    Some(Err(e)) => {
294                        if let IpcError::Disconnected = e {
295                            log::info!("{}Peer disconnected.", log_prefix);
296                            break;
297                        } else {
298                            log::error!("{}Error receiving message from peer {:?}", log_prefix, e);
299                            consecutive_error_count += 1;
300                            if consecutive_error_count > 20 {
301                                log::error!("{}Too many consecutive errors, shutting down.", log_prefix);
302                                break;
303                            }
304                        }
305                    }
306                    Some(Ok(message)) => {
307                        consecutive_error_count = 0;
308                        log::debug!("{}Got message! {:?}", log_prefix, message);
309                        let reply = pending_replies.remove(&message.uuid);
310                        if let Some((reply_drop_box, _)) = reply {
311                            log::debug!("{}It's a reply, forwarding!", log_prefix);
312                            // If the end user doesn't want this message that's fine.
313                            let _ = reply_drop_box.send(Ok(message.kind));
314                        } else {
315                            log::debug!("{}It's not a reply, handling!", log_prefix);
316                            let message_uuid = message.uuid;
317                            match message.kind {
318                                InternalMessageKind::UserMessage(user_message) => {
319                                    let message_handler = Arc::clone(&message_handler);
320                                    let response_sender = response_sender.clone();
321                                    tokio::spawn(async move {
322                                        if let Some(m) = message_handler(user_message).await {
323                                            let r = response_sender.send(InternalMessage {
324                                                uuid: message_uuid,
325                                                kind: InternalMessageKind::UserMessage(m),
326                                            });
327                                            if let Err(e) = r {
328                                                log::error!("Failed to send reply {e:?}");
329                                            }
330                                        }
331                                    });
332                                }
333                                #[cfg(feature = "message-schema-validation")]
334                                InternalMessageKind::UserMessageSchema(other_schema) => {
335                                    let my_schema = schema_for!(U);
336                                    let kind = match serde_json::from_str::<RootSchema>(&other_schema) {
337                                        Ok(other_schema) => {
338                                            if other_schema == my_schema {
339                                                InternalMessageKind::UserMessageSchemaOk
340                                            } else {
341                                                InternalMessageKind::UserMessageSchemaError {
342                                                    other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
343                                                }
344                                            }
345                                        },
346                                        Err(_) => {
347                                            log::error!("Failed to deserialize incoming schema properly, got {other_schema:?}");
348                                            InternalMessageKind::UserMessageSchemaError {
349                                                other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
350                                            }
351                                        }
352                                    };
353                                    let r = response_sender.send(InternalMessage {
354                                        uuid: message_uuid,
355                                        kind,
356                                    });
357                                    if let Err(e) = r {
358                                        log::error!("Failed to send validation response {e:#?}");
359                                    }
360                                }
361                                InternalMessageKind::Hangup => {
362                                    break;
363                                }
364                                _ => {}
365                            }
366                        }
367                    }
368                }
369            }
370        }
371    }
372    let _ = status_sender.send(ConnectionStatus::DisconnectedCleanly);
373}
374
375fn get_log_prefix(is_server: bool) -> String {
376    let first_arg = env::args()
377        .next()
378        .unwrap_or_else(|| String::from("Unknown"));
379    let process = Path::new(&first_arg)
380        .file_name()
381        .unwrap_or_else(|| "Unknown".as_ref())
382        .to_string_lossy();
383    if is_server {
384        format!("{} as Server: ", process)
385    } else {
386        format!("{} as Client: ", process)
387    }
388}
389
390/// Reports the status of the connection between the server and the client
391#[derive(Clone, Debug)]
392pub enum ConnectionStatus {
393    /// The server is waiting for a client to connect. A client never waits for a server to connect.
394    WaitingForClient,
395    /// The connection is active.
396    Connected,
397    /// A shutdown happened, and this was normal. Nothing unexpected happened.
398    DisconnectedCleanly,
399    /// An unexpected shutdown occurred, error contained within.
400    DisconnectError(IpcRpcError),
401}
402
403impl ConnectionStatus {
404    pub fn session_end_result(&self) -> Option<Result<(), IpcRpcError>> {
405        match self {
406            ConnectionStatus::WaitingForClient | ConnectionStatus::Connected => None,
407            ConnectionStatus::DisconnectedCleanly => Some(Ok(())),
408            ConnectionStatus::DisconnectError(e) => Some(Err(e.clone())),
409        }
410    }
411}
412
413/// This future represents a reply to a previous message. It is not a public type. This is to permit
414/// future flexibility in how this interface is implemented.
415struct IpcReplyFuture<U: UserMessage> {
416    receiver: mpsc::UnboundedReceiver<Result<InternalMessageKind<U>, IpcRpcError>>,
417}
418
419impl<U: UserMessage> Future for IpcReplyFuture<U> {
420    type Output = Result<InternalMessageKind<U>, IpcRpcError>;
421
422    fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
423        self.receiver.poll_recv(cx).map(|o| match o {
424            Some(m) => m,
425            None => Err(IpcRpcError::ConnectionDropped),
426        })
427    }
428}
429
430struct IpcReceiveStream<T> {
431    receiver: mpsc::UnboundedReceiver<Result<T, IpcError>>,
432}
433
434impl<T> IpcReceiveStream<T>
435where
436    T: Send + for<'de> Deserialize<'de> + Serialize + 'static,
437{
438    pub fn new(ipc_receiver: IpcReceiver<T>) -> Self {
439        let (sender, receiver) = mpsc::unbounded_channel();
440        tokio::task::spawn_blocking(move || loop {
441            match ipc_receiver.try_recv_timeout(Duration::from_millis(250)) {
442                Ok(msg) => {
443                    if sender.send(Ok(msg)).is_err() {
444                        break;
445                    }
446                }
447                Err(TryRecvError::IpcError(e)) => {
448                    if sender.send(Err(e)).is_err() {
449                        break;
450                    }
451                }
452                Err(TryRecvError::Empty) => {
453                    if sender.is_closed() {
454                        break;
455                    }
456                }
457            }
458        });
459        Self { receiver }
460    }
461}
462
463impl<T> Stream for IpcReceiveStream<T> {
464    type Item = Result<T, IpcError>;
465
466    fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
467        self.receiver.poll_recv(cx)
468    }
469}
470
471/// Reports the outcome of automatic schema validation testing. This testing is performed
472/// on connection initiation.
473#[derive(Debug, Clone)]
474pub enum SchemaValidationStatus {
475    /// Crate was compiled without default features, so validation does not function.
476    ValidationDisabledAtCompileTime,
477    /// Internal error
478    ValidationNotPerformedProperly,
479    /// Internal error
480    ValidationCommunicationFailed(IpcRpcError),
481    /// Schemas matched, all clear
482    SchemasMatched,
483    /// Schemas didn't match, schemas provided for comparison
484    SchemaMismatch {
485        our_schema: String,
486        their_schema: String,
487    },
488}
489
490impl SchemaValidationStatus {
491    /// Returns true if this is an instance of [`Self::SchemasMatched`], and false otherwise.
492    pub fn is_success(&self) -> bool {
493        matches!(self, SchemaValidationStatus::SchemasMatched)
494    }
495
496    /// Panics if this status is not [`Self::SchemasMatched`], and prints the debug information into the panic.
497    pub fn assert_success(&self) {
498        if !self.is_success() {
499            panic!("ipc-rpc user message schema failed to validate, error {self:#?}");
500        }
501    }
502}
503
504/// A combination trait which represents all of the traits needed for a type to be
505/// used as a message with this library.
506///
507/// # Note on `JsonSchema`
508/// `JsonSchema` is a trait from the [`schemars`](https://crates.io/crates/schemars) crate.
509/// Deriving it for your message allows `ipc-rpc` to perform validation on message schemas
510/// after communication has started. The results of this validation are logged, and stored in
511/// [IpcRpcClient::schema_validated], [IpcRpcServer::schema_validated], and [IpcRpc::schema_validated].
512/// A failed validation will not crash the program.
513///
514/// Despite what this may imply, `ipc-rpc` does not use JSON for messages internally.
515///
516/// If you find yourself unable to derive `JsonSchema` then consider turning off
517/// default features for the `ipc-rpc` crate. Once you do, the `JsonSchema` requirement
518/// will go away.
519#[cfg(feature = "message-schema-validation")]
520pub trait UserMessage:
521    'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
522{
523}
524
525#[cfg(feature = "message-schema-validation")]
526impl<T> UserMessage for T where
527    T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
528{
529}
530
531/// A combination trait which represents all of the traits needed for a type to be
532/// used as a message with this library.
533#[cfg(not(feature = "message-schema-validation"))]
534pub trait UserMessage: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
535
536#[cfg(not(feature = "message-schema-validation"))]
537impl<T> UserMessage for T where T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
538
539/// Invokes an RPC call within the current async runtime.
540///
541/// # Params
542/// - `sender`: A reference to an [IpcRpcServer], [IpcRpcClient], or [IpcRpc]
543/// - `to_send`: The message sent to the remote
544/// - `receiver`: The expected response pattern, followed by a handler for it.
545///
546/// # Returns
547/// The value returned by `receiver`.
548///
549/// # Panics
550/// Panics if the message received from the remote doesn't match the pattern specified
551/// in receiver.
552///
553/// # Example
554///
555/// ```no_run
556/// use serde::{Deserialize, Serialize};
557/// use schemars::JsonSchema;
558/// use std::fmt::Debug;
559///
560/// #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
561/// enum Message {
562///     MakeMeASandwich,
563///     /// The sandwiches are made of i32 around here, don't judge.
564///     ASandwich(Vec<i32>),
565/// }
566///
567/// // Initialize a client
568///
569/// # async {
570/// # use ipc_rpc::{IpcRpcClient, ConnectionKey, rpc_call};
571/// # let mut client = IpcRpcClient::initialize_client(ConnectionKey::from(String::new()), |_| async {Option::<Message>::None}).await.unwrap();
572/// rpc_call!(
573///     sender: client,
574///     to_send: Message::MakeMeASandwich,
575///     receiver: Message::ASandwich(parts) => {
576///         log::info!("I got a sandwich! It contains\n{parts:#?}");
577///     },
578/// )
579/// .unwrap()
580/// # };
581/// ```
582#[macro_export]
583macro_rules! rpc_call {
584    (sender: $sender:expr, to_send: $to_send:expr, receiver: $received:pat_param => $to_do:block,) => {
585        $sender.send($to_send).await.map(|m| match m {
586            $received => $to_do,
587            _ => panic!("rpc_call response didn't match given pattern"),
588        })
589    };
590}
591
592#[cfg(test)]
593mod tests {
594    use tokio::time::timeout;
595
596    use super::*;
597
598    #[cfg(not(feature = "message-schema-validation"))]
599    compile_error!("Tests must be executed with all features on");
600
601    #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
602    pub struct IpcProtocolMessage {
603        pub kind: IpcProtocolMessageKind,
604    }
605
606    #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
607    pub enum IpcProtocolMessageKind {
608        TestMessage,
609        ClientTestReply,
610        ServerTestReply,
611    }
612
613    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
614    async fn basic_dialogue() {
615        let (server_key, mut server) =
616            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
617                match message.kind {
618                    IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
619                        kind: IpcProtocolMessageKind::ServerTestReply,
620                    }),
621                    _ => None,
622                }
623            })
624            .await
625            .unwrap();
626        let mut client = client::IpcRpcClient::initialize_client(
627            server_key,
628            |message: IpcProtocolMessage| async move {
629                match message.kind {
630                    IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
631                        kind: IpcProtocolMessageKind::ClientTestReply,
632                    }),
633                    _ => None,
634                }
635            },
636        )
637        .await
638        .unwrap();
639        server.schema_validated().await.unwrap().assert_success();
640        client.schema_validated().await.unwrap().assert_success();
641        let client_reply = server
642            .send(IpcProtocolMessage {
643                kind: IpcProtocolMessageKind::TestMessage,
644            })
645            .await;
646        if !matches!(
647            client_reply.as_ref().map(|r| &r.kind),
648            Ok(IpcProtocolMessageKind::ClientTestReply)
649        ) {
650            panic!("client reply was of unexpected type: {:?}", client_reply);
651        }
652        let server_reply = client
653            .send(IpcProtocolMessage {
654                kind: IpcProtocolMessageKind::TestMessage,
655            })
656            .await;
657        if !matches!(
658            server_reply.as_ref().map(|r| &r.kind),
659            Ok(IpcProtocolMessageKind::ServerTestReply)
660        ) {
661            panic!("server reply was of unexpected type: {:?}", server_reply);
662        }
663    }
664
665    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
666    async fn send_without_await() {
667        let (server_success_sender, mut server_success_receiver) = mpsc::unbounded_channel();
668        let (client_success_sender, mut client_success_receiver) = mpsc::unbounded_channel();
669        let (server_key, mut server) =
670            server::IpcRpcServer::initialize_server(move |message: IpcProtocolMessage| {
671                let server_success_sender = server_success_sender.clone();
672                async move {
673                    match message.kind {
674                        IpcProtocolMessageKind::TestMessage => {
675                            server_success_sender.send(()).unwrap()
676                        }
677                        _ => {}
678                    }
679                    None
680                }
681            })
682            .await
683            .unwrap();
684        let mut client = client::IpcRpcClient::initialize_client(
685            server_key,
686            move |message: IpcProtocolMessage| {
687                let client_success_sender = client_success_sender.clone();
688                async move {
689                    match message.kind {
690                        IpcProtocolMessageKind::TestMessage => {
691                            client_success_sender.send(()).unwrap()
692                        }
693                        _ => {}
694                    }
695                    None
696                }
697            },
698        )
699        .await
700        .unwrap();
701        server.schema_validated().await.unwrap().assert_success();
702        client.schema_validated().await.unwrap().assert_success();
703        let _ = server.send(IpcProtocolMessage {
704            kind: IpcProtocolMessageKind::TestMessage,
705        });
706        let _ = client.send(IpcProtocolMessage {
707            kind: IpcProtocolMessageKind::TestMessage,
708        });
709        assert_eq!(
710            timeout(Duration::from_secs(3), server_success_receiver.recv()).await,
711            Ok(Some(()))
712        );
713        assert_eq!(
714            timeout(Duration::from_secs(3), client_success_receiver.recv()).await,
715            Ok(Some(()))
716        );
717    }
718
719    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
720    async fn timeout_test() {
721        let (server_key, mut server) =
722            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
723                match message.kind {
724                    _ => None,
725                }
726            })
727            .await
728            .unwrap();
729        let mut client = client::IpcRpcClient::initialize_client(
730            server_key,
731            |message: IpcProtocolMessage| async move {
732                match message.kind {
733                    _ => None,
734                }
735            },
736        )
737        .await
738        .unwrap();
739        server.schema_validated().await.unwrap().assert_success();
740        client.schema_validated().await.unwrap().assert_success();
741        let wait_start = Instant::now();
742        let client_reply = server
743            .send(IpcProtocolMessage {
744                kind: IpcProtocolMessageKind::TestMessage,
745            })
746            .await;
747        assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
748        if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
749            panic!("client reply was of unexpected type: {:?}", client_reply);
750        }
751        let wait_start = Instant::now();
752        let server_reply = client
753            .send(IpcProtocolMessage {
754                kind: IpcProtocolMessageKind::TestMessage,
755            })
756            .await;
757        assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
758        if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
759            panic!("server reply was of unexpected type: {:?}", server_reply);
760        }
761    }
762
763    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
764    async fn custom_timeout_test() {
765        let (server_key, mut server) =
766            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
767                match message.kind {
768                    _ => None,
769                }
770            })
771            .await
772            .unwrap();
773        let mut client = client::IpcRpcClient::initialize_client(
774            server_key,
775            |message: IpcProtocolMessage| async move {
776                match message.kind {
777                    _ => None,
778                }
779            },
780        )
781        .await
782        .unwrap();
783        server.schema_validated().await.unwrap().assert_success();
784        client.schema_validated().await.unwrap().assert_success();
785        let custom_timeout: Duration = DEFAULT_REPLY_TIMEOUT / 2;
786        let wait_start = Instant::now();
787        let client_reply = server
788            .send_timeout(
789                IpcProtocolMessage {
790                    kind: IpcProtocolMessageKind::TestMessage,
791                },
792                custom_timeout,
793            )
794            .await;
795        assert!(wait_start.elapsed() >= custom_timeout);
796        assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
797        if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
798            panic!("client reply was of unexpected type: {:?}", client_reply);
799        }
800        let wait_start = Instant::now();
801        let server_reply = client
802            .send_timeout(
803                IpcProtocolMessage {
804                    kind: IpcProtocolMessageKind::TestMessage,
805                },
806                custom_timeout,
807            )
808            .await;
809        assert!(wait_start.elapsed() >= custom_timeout);
810        assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
811        if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
812            panic!("server reply was of unexpected type: {:?}", server_reply);
813        }
814    }
815    #[test_log::test]
816    fn server_drop_does_not_hang() {
817        let thread = std::thread::spawn(|| {
818            let runtime = tokio::runtime::Runtime::new().unwrap();
819            runtime.block_on(async {
820                let (_server_key, _server) = server::IpcRpcServer::initialize_server(
821                    |message: IpcProtocolMessage| async move {
822                        match message.kind {
823                            _ => None,
824                        }
825                    },
826                )
827                .await
828                .unwrap();
829            })
830        });
831
832        let start = Instant::now();
833        let timeout = Duration::from_secs(5);
834        while !thread.is_finished() {
835            if start.elapsed() >= timeout {
836                // Server drop is hanging, force quit with non-zero
837                std::process::exit(1);
838            }
839        }
840    }
841
842    // This test checks to see if a task has come to an end, so it must wait for the runtime to drop.
843    // Therefore tokio::test is not an option.
844    #[test_log::test]
845    fn server_disconnect_test() {
846        // We use an empty Arc as a resource inside the message handler to prove the message handler
847        // was dropped. If the message handler was dropped then the cleanup routines were executed.
848        let drop_detector = Arc::new(());
849        let drop_detector_clone = drop_detector.clone();
850        let runtime = tokio::runtime::Runtime::new().unwrap();
851        runtime.block_on(async move {
852            let (server_key, server) = server::IpcRpcServer::initialize_server({
853                let drop_detector_clone = drop_detector_clone.clone();
854                move |message: IpcProtocolMessage| {
855                    let drop_detector_clone = drop_detector_clone.clone();
856                    async move {
857                        match message.kind {
858                            _ => {
859                                // I don't expect this to ever be called, I just need the closure to take
860                                // ownership of the Arc.
861                                let _ = drop_detector_clone.clone();
862                                None
863                            }
864                        }
865                    }
866                }
867            })
868            .await
869            .unwrap();
870            let client = client::IpcRpcClient::initialize_client(
871                server_key,
872                |message: IpcProtocolMessage| async move {
873                    match message.kind {
874                        _ => None,
875                    }
876                },
877            )
878            .await
879            .unwrap();
880            assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
881            drop(server);
882            client.wait_for_server_to_disconnect().await.unwrap();
883        });
884        // It's important that the runtime be able to shut down quickly, so we'll assert it
885        // shut down within 3 seconds.
886        let start_shutdown = Instant::now();
887        runtime.shutdown_timeout(Duration::from_secs(5));
888        assert!(start_shutdown.elapsed() < Duration::from_secs(3));
889        assert_eq!(Arc::strong_count(&drop_detector), 1);
890    }
891
892    // This test checks to see if a task has come to an end, so it must wait for the runtime to drop.
893    // Therefore tokio::test is not an option.
894    #[test_log::test]
895    fn client_disconnect_test() {
896        use std::time::Instant;
897
898        // We use an empty Arc as a resource inside the message handler to prove the message handler
899        // was dropped. If the message handler was dropped then the cleanup routines were executed.
900        let drop_detector = Arc::new(());
901        let drop_detector_clone = drop_detector.clone();
902        let runtime = tokio::runtime::Runtime::new().unwrap();
903        runtime.block_on(async move {
904            let (server_key, mut server) =
905                server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
906                    match message.kind {
907                        _ => None,
908                    }
909                })
910                .await
911                .unwrap();
912            let client = client::IpcRpcClient::initialize_client(server_key, {
913                let drop_detector_clone = drop_detector_clone.clone();
914                move |message: IpcProtocolMessage| {
915                    let _drop_detector_clone = drop_detector_clone.clone();
916                    async move {
917                        match message.kind {
918                            _ => None,
919                        }
920                    }
921                }
922            })
923            .await
924            .unwrap();
925            assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
926            drop(client);
927            server.wait_for_client_to_disconnect().await.unwrap();
928        });
929        // It's important that the runtime be able to shut down quickly, so we'll assert it
930        // shut down within 3 seconds.
931        let start_shutdown = Instant::now();
932        runtime.shutdown_timeout(Duration::from_secs(5));
933        assert!(start_shutdown.elapsed() < Duration::from_secs(3));
934        assert_eq!(Arc::strong_count(&drop_detector), 1);
935    }
936
937    #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
938    async fn rpc_call_macro_test() {
939        let (server_key, mut server) =
940            server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
941                match message.kind {
942                    _ => Some(IpcProtocolMessage {
943                        kind: IpcProtocolMessageKind::ServerTestReply,
944                    }),
945                }
946            })
947            .await
948            .unwrap();
949        let mut client = client::IpcRpcClient::initialize_client(
950            server_key,
951            |message: IpcProtocolMessage| async move {
952                match message.kind {
953                    _ => Some(IpcProtocolMessage {
954                        kind: IpcProtocolMessageKind::ClientTestReply,
955                    }),
956                }
957            },
958        )
959        .await
960        .unwrap();
961        server.schema_validated().await.unwrap().assert_success();
962        client.schema_validated().await.unwrap().assert_success();
963        rpc_call!(
964            sender: server,
965            to_send: IpcProtocolMessage {
966                kind: IpcProtocolMessageKind::TestMessage
967            },
968            receiver: IpcProtocolMessage {
969                kind: IpcProtocolMessageKind::ClientTestReply
970            } => {
971
972            },
973        )
974        .unwrap();
975        rpc_call!(
976            sender: client,
977            to_send: IpcProtocolMessage {
978                kind: IpcProtocolMessageKind::TestMessage
979            },
980            receiver: IpcProtocolMessage {
981                kind: IpcProtocolMessageKind::ServerTestReply
982            } => {
983
984            },
985        )
986        .unwrap();
987    }
988}