1use std::{
63 collections::HashMap,
64 convert::Infallible,
65 env,
66 ffi::OsString,
67 fmt::{self, Debug, Display},
68 future::Future,
69 io,
70 path::Path,
71 pin::Pin,
72 str::FromStr,
73 sync::Arc,
74 task::{Context, Poll},
75};
76
77use futures::{Stream, StreamExt};
78use ipc_channel::ipc::{IpcError, IpcReceiver, IpcSender, TryRecvError};
79use serde::{de::DeserializeOwned, Deserialize, Serialize};
80use thiserror::Error;
81use tokio::{
82 sync::{mpsc, watch},
83 time::{Duration, Instant},
84};
85use uuid::Uuid;
86
87#[cfg(feature = "message-schema-validation")]
88use schemars::{schema_for, JsonSchema, Schema};
89
90mod client;
91pub use client::*;
92mod server;
93pub use server::*;
94
95#[derive(Deserialize, Serialize, Debug, Clone)]
97pub struct ConnectionKey(String);
98
99impl From<String> for ConnectionKey {
100 fn from(s: String) -> Self {
101 Self(s)
102 }
103}
104
105impl FromStr for ConnectionKey {
106 type Err = Infallible;
107
108 fn from_str(s: &str) -> Result<Self, Self::Err> {
109 Ok(Self(s.to_string()))
110 }
111}
112
113impl Display for ConnectionKey {
114 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
115 write!(f, "{}", self.0)
116 }
117}
118
119impl From<ConnectionKey> for OsString {
120 fn from(s: ConnectionKey) -> Self {
121 OsString::from(s.0)
122 }
123}
124
125impl From<ConnectionKey> for String {
126 fn from(key: ConnectionKey) -> Self {
127 key.0
128 }
129}
130
131type PendingReplyEntry<U> = (
136 Uuid,
137 (
138 mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
139 Instant,
140 ),
141);
142#[derive(Deserialize, Serialize, Debug, Clone)]
145#[serde(bound(deserialize = ""))]
146struct InternalMessage<U: UserMessage> {
147 uuid: uuid::Uuid,
150 kind: InternalMessageKind<U>,
152}
153
154#[derive(Deserialize, Serialize, Debug, Clone)]
158#[serde(bound(deserialize = ""))]
159enum InternalMessageKind<U: UserMessage> {
160 InitConnection(IpcSender<InternalMessage<U>>),
162 Hangup,
164 UserMessage(U),
166 UserMessageSchema(String),
168 UserMessageSchemaOk,
170 UserMessageSchemaError { other_schema: String },
172}
173
174#[derive(Clone, Debug, Error)]
176pub enum IpcRpcError {
177 #[error("io error")]
178 IoError(#[from] Arc<io::Error>),
179 #[error("internal ipc channel error")]
180 IpcChannelError(#[from] Arc<ipc_channel::Error>),
181 #[error("connection initialization timed out")]
182 ConnectTimeout,
183 #[error("connection established, but initial handshake was not performed properly")]
184 HandshakeFailure,
185 #[error("client already connected")]
186 ClientAlreadyConnected,
187 #[error("peer disconnected")]
188 Disconnected,
189 #[error("time out while waiting for a reply")]
190 ReplyTimeout,
191 #[error("connection dropped pre-emptively")]
192 ConnectionDropped,
193}
194
195impl From<io::Error> for IpcRpcError {
196 fn from(e: io::Error) -> Self {
197 Self::IoError(Arc::new(e))
198 }
199}
200
201impl From<ipc_channel::Error> for IpcRpcError {
202 fn from(e: ipc_channel::Error) -> Self {
203 Self::IpcChannelError(Arc::new(e))
204 }
205}
206
207pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
218
219async fn process_incoming_mail<
222 Fut: Future<Output = Option<U>> + Send,
223 F: Fn(U) -> Fut + Send + Sync + 'static,
224 U: UserMessage,
225>(
226 is_server: bool,
227 mut pending_reply_receiver: mpsc::UnboundedReceiver<PendingReplyEntry<U>>,
228 mut receiver: IpcReceiveStream<InternalMessage<U>>,
229 message_handler: F,
230 response_sender: IpcSender<InternalMessage<U>>,
231 status_sender: watch::Sender<ConnectionStatus>,
232) {
233 let mut pending_replies = HashMap::<
234 Uuid,
235 (
236 mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
237 Instant,
238 ),
239 >::new();
240 let message_handler = Arc::new(message_handler);
241 let log_prefix = get_log_prefix(is_server);
242 log::info!("{}Processing incoming mail!", log_prefix);
243 let mut consecutive_error_count = 0;
244 let mut pending_reply_scheduled_time = Option::<Instant>::None;
245
246 let add_pending_reply = |pending_reply_scheduled_time: &mut Option<Instant>,
247 pending_replies: &mut HashMap<_, _>,
248 (key, (sender, timeout))| {
249 *pending_reply_scheduled_time = Some(
250 pending_reply_scheduled_time
251 .map(|t| t.min(timeout))
252 .unwrap_or(timeout),
253 );
254
255 pending_replies.insert(key, (sender, timeout));
256 };
257 loop {
258 while let Ok(pending_reply) = pending_reply_receiver.try_recv() {
261 add_pending_reply(
262 &mut pending_reply_scheduled_time,
263 &mut pending_replies,
264 pending_reply,
265 );
266 }
267 tokio::select! {
268 true = async { if let Some(t) = pending_reply_scheduled_time { tokio::time::sleep_until(t).await; true } else { false } } => {
269 pending_replies.retain(|_k, v| {
270 let keep = v.1 > Instant::now();
271 if !keep {
272 let _ = v.0.send(Err(IpcRpcError::ReplyTimeout));
273 }
274 keep
275 });
276 pending_reply_scheduled_time = pending_replies.values().map(|i| i.1).min();
277 }
278 pending_reply = pending_reply_receiver.recv() => {
279 match pending_reply {
280 None => {
281 break;
283 }
284 Some(pending_reply) => {
285 add_pending_reply(&mut pending_reply_scheduled_time, &mut pending_replies, pending_reply);
286 }
287 }
288 },
289 r = receiver.next() => {
290 match r {
291 None => {
292 break;
294 }
295 Some(Err(e)) => {
296 if let IpcError::Disconnected = e {
297 log::info!("{}Peer disconnected.", log_prefix);
298 break;
299 } else {
300 log::error!("{}Error receiving message from peer {:?}", log_prefix, e);
301 consecutive_error_count += 1;
302 if consecutive_error_count > 20 {
303 log::error!("{}Too many consecutive errors, shutting down.", log_prefix);
304 break;
305 }
306 }
307 }
308 Some(Ok(message)) => {
309 consecutive_error_count = 0;
310 log::debug!("{}Got message! {:?}", log_prefix, message);
311 let reply = pending_replies.remove(&message.uuid);
312 if let Some((reply_drop_box, _)) = reply {
313 log::debug!("{}It's a reply, forwarding!", log_prefix);
314 let _ = reply_drop_box.send(Ok(message.kind));
316 } else {
317 log::debug!("{}It's not a reply, handling!", log_prefix);
318 let message_uuid = message.uuid;
319 match message.kind {
320 InternalMessageKind::UserMessage(user_message) => {
321 let message_handler = Arc::clone(&message_handler);
322 let response_sender = response_sender.clone();
323 tokio::spawn(async move {
324 if let Some(m) = message_handler(user_message).await {
325 let r = response_sender.send(InternalMessage {
326 uuid: message_uuid,
327 kind: InternalMessageKind::UserMessage(m),
328 });
329 if let Err(e) = r {
330 log::error!("Failed to send reply {e:?}");
331 }
332 }
333 });
334 }
335 #[cfg(feature = "message-schema-validation")]
336 InternalMessageKind::UserMessageSchema(other_schema) => {
337 let my_schema = schema_for!(U);
338 let kind = match serde_json::from_str::<Schema>(&other_schema) {
339 Ok(other_schema) => {
340 if other_schema == my_schema {
341 InternalMessageKind::UserMessageSchemaOk
342 } else {
343 InternalMessageKind::UserMessageSchemaError {
344 other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
345 }
346 }
347 },
348 Err(_) => {
349 log::error!("Failed to deserialize incoming schema properly, got {other_schema:?}");
350 InternalMessageKind::UserMessageSchemaError {
351 other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
352 }
353 }
354 };
355 let r = response_sender.send(InternalMessage {
356 uuid: message_uuid,
357 kind,
358 });
359 if let Err(e) = r {
360 log::error!("Failed to send validation response {e:#?}");
361 }
362 }
363 InternalMessageKind::Hangup => {
364 break;
365 }
366 _ => {}
367 }
368 }
369 }
370 }
371 }
372 }
373 }
374 let _ = status_sender.send(ConnectionStatus::DisconnectedCleanly);
375}
376
377fn get_log_prefix(is_server: bool) -> String {
378 let first_arg = env::args()
379 .next()
380 .unwrap_or_else(|| String::from("Unknown"));
381 let process = Path::new(&first_arg)
382 .file_name()
383 .unwrap_or_else(|| "Unknown".as_ref())
384 .to_string_lossy();
385 if is_server {
386 format!("{} as Server: ", process)
387 } else {
388 format!("{} as Client: ", process)
389 }
390}
391
392#[derive(Clone, Debug)]
394pub enum ConnectionStatus {
395 WaitingForClient,
397 Connected,
399 DisconnectedCleanly,
401 DisconnectError(IpcRpcError),
403}
404
405impl ConnectionStatus {
406 pub fn session_end_result(&self) -> Option<Result<(), IpcRpcError>> {
407 match self {
408 ConnectionStatus::WaitingForClient | ConnectionStatus::Connected => None,
409 ConnectionStatus::DisconnectedCleanly => Some(Ok(())),
410 ConnectionStatus::DisconnectError(e) => Some(Err(e.clone())),
411 }
412 }
413}
414
415struct IpcReplyFuture<U: UserMessage> {
418 receiver: mpsc::UnboundedReceiver<Result<InternalMessageKind<U>, IpcRpcError>>,
419}
420
421impl<U: UserMessage> Future for IpcReplyFuture<U> {
422 type Output = Result<InternalMessageKind<U>, IpcRpcError>;
423
424 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
425 self.receiver.poll_recv(cx).map(|o| match o {
426 Some(m) => m,
427 None => Err(IpcRpcError::ConnectionDropped),
428 })
429 }
430}
431
432struct IpcReceiveStream<T> {
433 receiver: mpsc::UnboundedReceiver<Result<T, IpcError>>,
434}
435
436impl<T> IpcReceiveStream<T>
437where
438 T: Send + for<'de> Deserialize<'de> + Serialize + 'static,
439{
440 pub fn new(ipc_receiver: IpcReceiver<T>) -> Self {
441 let (sender, receiver) = mpsc::unbounded_channel();
442 tokio::task::spawn_blocking(move || loop {
443 match ipc_receiver.try_recv_timeout(Duration::from_millis(250)) {
444 Ok(msg) => {
445 if sender.send(Ok(msg)).is_err() {
446 break;
447 }
448 }
449 Err(TryRecvError::IpcError(e)) => {
450 if sender.send(Err(e)).is_err() {
451 break;
452 }
453 }
454 Err(TryRecvError::Empty) => {
455 if sender.is_closed() {
456 break;
457 }
458 }
459 }
460 });
461 Self { receiver }
462 }
463}
464
465impl<T> Stream for IpcReceiveStream<T> {
466 type Item = Result<T, IpcError>;
467
468 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
469 self.receiver.poll_recv(cx)
470 }
471}
472
473#[derive(Debug, Clone)]
476pub enum SchemaValidationStatus {
477 ValidationDisabledAtCompileTime,
479 ValidationNotPerformedProperly,
481 ValidationCommunicationFailed(IpcRpcError),
483 SchemasMatched,
485 SchemaMismatch {
487 our_schema: String,
488 their_schema: String,
489 },
490}
491
492impl SchemaValidationStatus {
493 pub fn is_success(&self) -> bool {
495 matches!(self, SchemaValidationStatus::SchemasMatched)
496 }
497
498 pub fn assert_success(&self) {
500 if !self.is_success() {
501 panic!("ipc-rpc user message schema failed to validate, error {self:#?}");
502 }
503 }
504}
505
506#[cfg(feature = "message-schema-validation")]
522pub trait UserMessage:
523 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
524{
525}
526
527#[cfg(feature = "message-schema-validation")]
528impl<T> UserMessage for T where
529 T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
530{
531}
532
533#[cfg(not(feature = "message-schema-validation"))]
536pub trait UserMessage: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
537
538#[cfg(not(feature = "message-schema-validation"))]
539impl<T> UserMessage for T where T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
540
541#[macro_export]
585macro_rules! rpc_call {
586 (sender: $sender:expr, to_send: $to_send:expr, receiver: $received:pat_param => $to_do:block,) => {
587 $sender.send($to_send).await.map(|m| match m {
588 $received => $to_do,
589 _ => panic!("rpc_call response didn't match given pattern"),
590 })
591 };
592}
593
594#[cfg(test)]
595mod tests {
596 use tokio::time::timeout;
597
598 use super::*;
599
600 #[cfg(not(feature = "message-schema-validation"))]
601 compile_error!("Tests must be executed with all features on");
602
603 #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
604 pub struct IpcProtocolMessage {
605 pub kind: IpcProtocolMessageKind,
606 }
607
608 #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
609 pub enum IpcProtocolMessageKind {
610 TestMessage,
611 ClientTestReply,
612 ServerTestReply,
613 }
614
615 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
616 async fn basic_dialogue() {
617 let (server_key, mut server) =
618 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
619 match message.kind {
620 IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
621 kind: IpcProtocolMessageKind::ServerTestReply,
622 }),
623 _ => None,
624 }
625 })
626 .await
627 .unwrap();
628 let mut client = client::IpcRpcClient::initialize_client(
629 server_key,
630 |message: IpcProtocolMessage| async move {
631 match message.kind {
632 IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
633 kind: IpcProtocolMessageKind::ClientTestReply,
634 }),
635 _ => None,
636 }
637 },
638 )
639 .await
640 .unwrap();
641 server.schema_validated().await.unwrap().assert_success();
642 client.schema_validated().await.unwrap().assert_success();
643 let client_reply = server
644 .send(IpcProtocolMessage {
645 kind: IpcProtocolMessageKind::TestMessage,
646 })
647 .await;
648 if !matches!(
649 client_reply.as_ref().map(|r| &r.kind),
650 Ok(IpcProtocolMessageKind::ClientTestReply)
651 ) {
652 panic!("client reply was of unexpected type: {:?}", client_reply);
653 }
654 let server_reply = client
655 .send(IpcProtocolMessage {
656 kind: IpcProtocolMessageKind::TestMessage,
657 })
658 .await;
659 if !matches!(
660 server_reply.as_ref().map(|r| &r.kind),
661 Ok(IpcProtocolMessageKind::ServerTestReply)
662 ) {
663 panic!("server reply was of unexpected type: {:?}", server_reply);
664 }
665 }
666
667 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
668 async fn send_without_await() {
669 let (server_success_sender, mut server_success_receiver) = mpsc::unbounded_channel();
670 let (client_success_sender, mut client_success_receiver) = mpsc::unbounded_channel();
671 let (server_key, mut server) =
672 server::IpcRpcServer::initialize_server(move |message: IpcProtocolMessage| {
673 let server_success_sender = server_success_sender.clone();
674 async move {
675 match message.kind {
676 IpcProtocolMessageKind::TestMessage => {
677 server_success_sender.send(()).unwrap()
678 }
679 _ => {}
680 }
681 None
682 }
683 })
684 .await
685 .unwrap();
686 let mut client = client::IpcRpcClient::initialize_client(
687 server_key,
688 move |message: IpcProtocolMessage| {
689 let client_success_sender = client_success_sender.clone();
690 async move {
691 match message.kind {
692 IpcProtocolMessageKind::TestMessage => {
693 client_success_sender.send(()).unwrap()
694 }
695 _ => {}
696 }
697 None
698 }
699 },
700 )
701 .await
702 .unwrap();
703 server.schema_validated().await.unwrap().assert_success();
704 client.schema_validated().await.unwrap().assert_success();
705 let _ = server.send(IpcProtocolMessage {
706 kind: IpcProtocolMessageKind::TestMessage,
707 });
708 let _ = client.send(IpcProtocolMessage {
709 kind: IpcProtocolMessageKind::TestMessage,
710 });
711 assert_eq!(
712 timeout(Duration::from_secs(3), server_success_receiver.recv()).await,
713 Ok(Some(()))
714 );
715 assert_eq!(
716 timeout(Duration::from_secs(3), client_success_receiver.recv()).await,
717 Ok(Some(()))
718 );
719 }
720
721 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
722 async fn timeout_test() {
723 let (server_key, mut server) =
724 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
725 match message.kind {
726 _ => None,
727 }
728 })
729 .await
730 .unwrap();
731 let mut client = client::IpcRpcClient::initialize_client(
732 server_key,
733 |message: IpcProtocolMessage| async move {
734 match message.kind {
735 _ => None,
736 }
737 },
738 )
739 .await
740 .unwrap();
741 server.schema_validated().await.unwrap().assert_success();
742 client.schema_validated().await.unwrap().assert_success();
743 let wait_start = Instant::now();
744 let client_reply = server
745 .send(IpcProtocolMessage {
746 kind: IpcProtocolMessageKind::TestMessage,
747 })
748 .await;
749 assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
750 if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
751 panic!("client reply was of unexpected type: {:?}", client_reply);
752 }
753 let wait_start = Instant::now();
754 let server_reply = client
755 .send(IpcProtocolMessage {
756 kind: IpcProtocolMessageKind::TestMessage,
757 })
758 .await;
759 assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
760 if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
761 panic!("server reply was of unexpected type: {:?}", server_reply);
762 }
763 }
764
765 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
766 async fn custom_timeout_test() {
767 let (server_key, mut server) =
768 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
769 match message.kind {
770 _ => None,
771 }
772 })
773 .await
774 .unwrap();
775 let mut client = client::IpcRpcClient::initialize_client(
776 server_key,
777 |message: IpcProtocolMessage| async move {
778 match message.kind {
779 _ => None,
780 }
781 },
782 )
783 .await
784 .unwrap();
785 server.schema_validated().await.unwrap().assert_success();
786 client.schema_validated().await.unwrap().assert_success();
787 let custom_timeout: Duration = DEFAULT_REPLY_TIMEOUT / 2;
788 let wait_start = Instant::now();
789 let client_reply = server
790 .send_timeout(
791 IpcProtocolMessage {
792 kind: IpcProtocolMessageKind::TestMessage,
793 },
794 custom_timeout,
795 )
796 .await;
797 assert!(wait_start.elapsed() >= custom_timeout);
798 assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
799 if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
800 panic!("client reply was of unexpected type: {:?}", client_reply);
801 }
802 let wait_start = Instant::now();
803 let server_reply = client
804 .send_timeout(
805 IpcProtocolMessage {
806 kind: IpcProtocolMessageKind::TestMessage,
807 },
808 custom_timeout,
809 )
810 .await;
811 assert!(wait_start.elapsed() >= custom_timeout);
812 assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
813 if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
814 panic!("server reply was of unexpected type: {:?}", server_reply);
815 }
816 }
817 #[test_log::test]
818 fn server_drop_does_not_hang() {
819 let thread = std::thread::spawn(|| {
820 let runtime = tokio::runtime::Runtime::new().unwrap();
821 runtime.block_on(async {
822 let (_server_key, _server) = server::IpcRpcServer::initialize_server(
823 |message: IpcProtocolMessage| async move {
824 match message.kind {
825 _ => None,
826 }
827 },
828 )
829 .await
830 .unwrap();
831 })
832 });
833
834 let start = Instant::now();
835 let timeout = Duration::from_secs(5);
836 while !thread.is_finished() {
837 if start.elapsed() >= timeout {
838 std::process::exit(1);
840 }
841 }
842 }
843
844 #[test_log::test]
847 fn server_disconnect_test() {
848 let drop_detector = Arc::new(());
851 let drop_detector_clone = drop_detector.clone();
852 let runtime = tokio::runtime::Runtime::new().unwrap();
853 runtime.block_on(async move {
854 let (server_key, server) = server::IpcRpcServer::initialize_server({
855 let drop_detector_clone = drop_detector_clone.clone();
856 move |message: IpcProtocolMessage| {
857 let drop_detector_clone = drop_detector_clone.clone();
858 async move {
859 match message.kind {
860 _ => {
861 let _ = drop_detector_clone.clone();
864 None
865 }
866 }
867 }
868 }
869 })
870 .await
871 .unwrap();
872 let client = client::IpcRpcClient::initialize_client(
873 server_key,
874 |message: IpcProtocolMessage| async move {
875 match message.kind {
876 _ => None,
877 }
878 },
879 )
880 .await
881 .unwrap();
882 assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
883 drop(server);
884 client.wait_for_server_to_disconnect().await.unwrap();
885 });
886 let start_shutdown = Instant::now();
889 runtime.shutdown_timeout(Duration::from_secs(5));
890 assert!(start_shutdown.elapsed() < Duration::from_secs(3));
891 assert_eq!(Arc::strong_count(&drop_detector), 1);
892 }
893
894 #[test_log::test]
897 fn client_disconnect_test() {
898 use std::time::Instant;
899
900 let drop_detector = Arc::new(());
903 let drop_detector_clone = drop_detector.clone();
904 let runtime = tokio::runtime::Runtime::new().unwrap();
905 runtime.block_on(async move {
906 let (server_key, mut server) =
907 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
908 match message.kind {
909 _ => None,
910 }
911 })
912 .await
913 .unwrap();
914 let client = client::IpcRpcClient::initialize_client(server_key, {
915 let drop_detector_clone = drop_detector_clone.clone();
916 move |message: IpcProtocolMessage| {
917 let _drop_detector_clone = drop_detector_clone.clone();
918 async move {
919 match message.kind {
920 _ => None,
921 }
922 }
923 }
924 })
925 .await
926 .unwrap();
927 assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
928 drop(client);
929 server.wait_for_client_to_disconnect().await.unwrap();
930 });
931 let start_shutdown = Instant::now();
934 runtime.shutdown_timeout(Duration::from_secs(5));
935 assert!(start_shutdown.elapsed() < Duration::from_secs(3));
936 assert_eq!(Arc::strong_count(&drop_detector), 1);
937 }
938
939 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
940 async fn rpc_call_macro_test() {
941 let (server_key, mut server) =
942 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
943 match message.kind {
944 _ => Some(IpcProtocolMessage {
945 kind: IpcProtocolMessageKind::ServerTestReply,
946 }),
947 }
948 })
949 .await
950 .unwrap();
951 let mut client = client::IpcRpcClient::initialize_client(
952 server_key,
953 |message: IpcProtocolMessage| async move {
954 match message.kind {
955 _ => Some(IpcProtocolMessage {
956 kind: IpcProtocolMessageKind::ClientTestReply,
957 }),
958 }
959 },
960 )
961 .await
962 .unwrap();
963 server.schema_validated().await.unwrap().assert_success();
964 client.schema_validated().await.unwrap().assert_success();
965 rpc_call!(
966 sender: server,
967 to_send: IpcProtocolMessage {
968 kind: IpcProtocolMessageKind::TestMessage
969 },
970 receiver: IpcProtocolMessage {
971 kind: IpcProtocolMessageKind::ClientTestReply
972 } => {
973
974 },
975 )
976 .unwrap();
977 rpc_call!(
978 sender: client,
979 to_send: IpcProtocolMessage {
980 kind: IpcProtocolMessageKind::TestMessage
981 },
982 receiver: IpcProtocolMessage {
983 kind: IpcProtocolMessageKind::ServerTestReply
984 } => {
985
986 },
987 )
988 .unwrap();
989 }
990}