1use std::{
63 collections::HashMap,
64 convert::Infallible,
65 env,
66 ffi::OsString,
67 fmt::{self, Debug, Display},
68 future::Future,
69 io,
70 path::Path,
71 pin::Pin,
72 str::FromStr,
73 sync::Arc,
74 task::{Context, Poll},
75};
76
77use futures::{Stream, StreamExt};
78use ipc_channel::{
79 ipc::{IpcReceiver, IpcSender},
80 IpcError, TryRecvError,
81};
82use serde::{de::DeserializeOwned, Deserialize, Serialize};
83use thiserror::Error;
84use tokio::{
85 sync::{mpsc, watch},
86 time::{Duration, Instant},
87};
88use uuid::Uuid;
89
90#[cfg(feature = "message-schema-validation")]
91use schemars::{schema_for, JsonSchema, Schema};
92
93mod client;
94pub use client::*;
95mod server;
96pub use server::*;
97
98#[derive(Deserialize, Serialize, Debug, Clone)]
100pub struct ConnectionKey(String);
101
102impl From<String> for ConnectionKey {
103 fn from(s: String) -> Self {
104 Self(s)
105 }
106}
107
108impl FromStr for ConnectionKey {
109 type Err = Infallible;
110
111 fn from_str(s: &str) -> Result<Self, Self::Err> {
112 Ok(Self(s.to_string()))
113 }
114}
115
116impl Display for ConnectionKey {
117 fn fmt(&self, f: &mut fmt::Formatter) -> Result<(), fmt::Error> {
118 write!(f, "{}", self.0)
119 }
120}
121
122impl From<ConnectionKey> for OsString {
123 fn from(s: ConnectionKey) -> Self {
124 OsString::from(s.0)
125 }
126}
127
128impl From<ConnectionKey> for String {
129 fn from(key: ConnectionKey) -> Self {
130 key.0
131 }
132}
133
134type PendingReplyEntry<U> = (
139 Uuid,
140 (
141 mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
142 Instant,
143 ),
144);
145#[derive(Deserialize, Serialize, Debug, Clone)]
148#[serde(bound(deserialize = ""))]
149struct InternalMessage<U: UserMessage> {
150 uuid: uuid::Uuid,
153 kind: InternalMessageKind<U>,
155}
156
157#[derive(Deserialize, Serialize, Debug, Clone)]
161#[serde(bound(deserialize = ""))]
162enum InternalMessageKind<U: UserMessage> {
163 InitConnection(IpcSender<InternalMessage<U>>),
165 Hangup,
167 UserMessage(U),
169 UserMessageSchema(String),
171 UserMessageSchemaOk,
173 UserMessageSchemaError { other_schema: String },
175}
176
177#[derive(Clone, Debug, Error)]
179pub enum IpcRpcError {
180 #[error("io error")]
181 IoError(#[from] Arc<io::Error>),
182 #[error("internal ipc channel error")]
183 IpcChannelError(#[from] Arc<IpcError>),
184 #[error("connection initialization timed out")]
185 ConnectTimeout,
186 #[error("connection established, but initial handshake was not performed properly")]
187 HandshakeFailure,
188 #[error("client already connected")]
189 ClientAlreadyConnected,
190 #[error("peer disconnected")]
191 Disconnected,
192 #[error("time out while waiting for a reply")]
193 ReplyTimeout,
194 #[error("connection dropped pre-emptively")]
195 ConnectionDropped,
196}
197
198impl From<io::Error> for IpcRpcError {
199 fn from(e: io::Error) -> Self {
200 Self::IoError(Arc::new(e))
201 }
202}
203
204impl From<IpcError> for IpcRpcError {
205 fn from(e: IpcError) -> Self {
206 Self::IpcChannelError(Arc::new(e))
207 }
208}
209
210pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
221
222async fn process_incoming_mail<
225 Fut: Future<Output = Option<U>> + Send,
226 F: Fn(U) -> Fut + Send + Sync + 'static,
227 U: UserMessage,
228>(
229 is_server: bool,
230 mut pending_reply_receiver: mpsc::UnboundedReceiver<PendingReplyEntry<U>>,
231 mut receiver: IpcReceiveStream<InternalMessage<U>>,
232 message_handler: F,
233 response_sender: IpcSender<InternalMessage<U>>,
234 status_sender: watch::Sender<ConnectionStatus>,
235) {
236 let mut pending_replies = HashMap::<
237 Uuid,
238 (
239 mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
240 Instant,
241 ),
242 >::new();
243 let message_handler = Arc::new(message_handler);
244 let log_prefix = get_log_prefix(is_server);
245 log::info!("{}Processing incoming mail!", log_prefix);
246 let mut consecutive_error_count = 0;
247 let mut pending_reply_scheduled_time = Option::<Instant>::None;
248
249 let add_pending_reply = |pending_reply_scheduled_time: &mut Option<Instant>,
250 pending_replies: &mut HashMap<_, _>,
251 (key, (sender, timeout))| {
252 *pending_reply_scheduled_time = Some(
253 pending_reply_scheduled_time
254 .map(|t| t.min(timeout))
255 .unwrap_or(timeout),
256 );
257
258 pending_replies.insert(key, (sender, timeout));
259 };
260 loop {
261 while let Ok(pending_reply) = pending_reply_receiver.try_recv() {
264 add_pending_reply(
265 &mut pending_reply_scheduled_time,
266 &mut pending_replies,
267 pending_reply,
268 );
269 }
270 tokio::select! {
271 true = async { if let Some(t) = pending_reply_scheduled_time { tokio::time::sleep_until(t).await; true } else { false } } => {
272 pending_replies.retain(|_k, v| {
273 let keep = v.1 > Instant::now();
274 if !keep {
275 let _ = v.0.send(Err(IpcRpcError::ReplyTimeout));
276 }
277 keep
278 });
279 pending_reply_scheduled_time = pending_replies.values().map(|i| i.1).min();
280 }
281 pending_reply = pending_reply_receiver.recv() => {
282 match pending_reply {
283 None => {
284 break;
286 }
287 Some(pending_reply) => {
288 add_pending_reply(&mut pending_reply_scheduled_time, &mut pending_replies, pending_reply);
289 }
290 }
291 },
292 r = receiver.next() => {
293 match r {
294 None => {
295 break;
297 }
298 Some(Err(e)) => {
299 if let IpcError::Disconnected = e {
300 log::info!("{}Peer disconnected.", log_prefix);
301 break;
302 } else {
303 log::error!("{}Error receiving message from peer {:?}", log_prefix, e);
304 consecutive_error_count += 1;
305 if consecutive_error_count > 20 {
306 log::error!("{}Too many consecutive errors, shutting down.", log_prefix);
307 break;
308 }
309 }
310 }
311 Some(Ok(message)) => {
312 consecutive_error_count = 0;
313 log::debug!("{}Got message! {:?}", log_prefix, message);
314 let reply = pending_replies.remove(&message.uuid);
315 if let Some((reply_drop_box, _)) = reply {
316 log::debug!("{}It's a reply, forwarding!", log_prefix);
317 let _ = reply_drop_box.send(Ok(message.kind));
319 } else {
320 log::debug!("{}It's not a reply, handling!", log_prefix);
321 let message_uuid = message.uuid;
322 match message.kind {
323 InternalMessageKind::UserMessage(user_message) => {
324 let message_handler = Arc::clone(&message_handler);
325 let response_sender = response_sender.clone();
326 tokio::spawn(async move {
327 if let Some(m) = message_handler(user_message).await {
328 let r = response_sender.send(InternalMessage {
329 uuid: message_uuid,
330 kind: InternalMessageKind::UserMessage(m),
331 });
332 if let Err(e) = r {
333 log::error!("Failed to send reply {e:?}");
334 }
335 }
336 });
337 }
338 #[cfg(feature = "message-schema-validation")]
339 InternalMessageKind::UserMessageSchema(other_schema) => {
340 let my_schema = schema_for!(U);
341 let kind = match serde_json::from_str::<Schema>(&other_schema) {
342 Ok(other_schema) => {
343 if other_schema == my_schema {
344 InternalMessageKind::UserMessageSchemaOk
345 } else {
346 InternalMessageKind::UserMessageSchemaError {
347 other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
348 }
349 }
350 },
351 Err(_) => {
352 log::error!("Failed to deserialize incoming schema properly, got {other_schema:?}");
353 InternalMessageKind::UserMessageSchemaError {
354 other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
355 }
356 }
357 };
358 let r = response_sender.send(InternalMessage {
359 uuid: message_uuid,
360 kind,
361 });
362 if let Err(e) = r {
363 log::error!("Failed to send validation response {e:#?}");
364 }
365 }
366 InternalMessageKind::Hangup => {
367 break;
368 }
369 _ => {}
370 }
371 }
372 }
373 }
374 }
375 }
376 }
377 let _ = status_sender.send(ConnectionStatus::DisconnectedCleanly);
378}
379
380fn get_log_prefix(is_server: bool) -> String {
381 let first_arg = env::args()
382 .next()
383 .unwrap_or_else(|| String::from("Unknown"));
384 let process = Path::new(&first_arg)
385 .file_name()
386 .unwrap_or_else(|| "Unknown".as_ref())
387 .to_string_lossy();
388 if is_server {
389 format!("{} as Server: ", process)
390 } else {
391 format!("{} as Client: ", process)
392 }
393}
394
395#[derive(Clone, Debug)]
397pub enum ConnectionStatus {
398 WaitingForClient,
400 Connected,
402 DisconnectedCleanly,
404 DisconnectError(IpcRpcError),
406}
407
408impl ConnectionStatus {
409 pub fn session_end_result(&self) -> Option<Result<(), IpcRpcError>> {
410 match self {
411 ConnectionStatus::WaitingForClient | ConnectionStatus::Connected => None,
412 ConnectionStatus::DisconnectedCleanly => Some(Ok(())),
413 ConnectionStatus::DisconnectError(e) => Some(Err(e.clone())),
414 }
415 }
416}
417
418struct IpcReplyFuture<U: UserMessage> {
421 receiver: mpsc::UnboundedReceiver<Result<InternalMessageKind<U>, IpcRpcError>>,
422}
423
424impl<U: UserMessage> Future for IpcReplyFuture<U> {
425 type Output = Result<InternalMessageKind<U>, IpcRpcError>;
426
427 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
428 self.receiver.poll_recv(cx).map(|o| match o {
429 Some(m) => m,
430 None => Err(IpcRpcError::ConnectionDropped),
431 })
432 }
433}
434
435struct IpcReceiveStream<T> {
436 receiver: mpsc::UnboundedReceiver<Result<T, IpcError>>,
437}
438
439impl<T> IpcReceiveStream<T>
440where
441 T: Send + for<'de> Deserialize<'de> + Serialize + 'static,
442{
443 pub fn new(ipc_receiver: IpcReceiver<T>) -> Self {
444 let (sender, receiver) = mpsc::unbounded_channel();
445 tokio::task::spawn_blocking(move || loop {
446 match ipc_receiver.try_recv_timeout(Duration::from_millis(250)) {
447 Ok(msg) => {
448 if sender.send(Ok(msg)).is_err() {
449 break;
450 }
451 }
452 Err(TryRecvError::IpcError(e)) => {
453 if sender.send(Err(e)).is_err() {
454 break;
455 }
456 }
457 Err(TryRecvError::Empty) => {
458 if sender.is_closed() {
459 break;
460 }
461 }
462 }
463 });
464 Self { receiver }
465 }
466}
467
468impl<T> Stream for IpcReceiveStream<T> {
469 type Item = Result<T, IpcError>;
470
471 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
472 self.receiver.poll_recv(cx)
473 }
474}
475
476#[derive(Debug, Clone)]
479pub enum SchemaValidationStatus {
480 ValidationDisabledAtCompileTime,
482 ValidationNotPerformedProperly,
484 ValidationCommunicationFailed(IpcRpcError),
486 SchemasMatched,
488 SchemaMismatch {
490 our_schema: String,
491 their_schema: String,
492 },
493}
494
495impl SchemaValidationStatus {
496 pub fn is_success(&self) -> bool {
498 matches!(self, SchemaValidationStatus::SchemasMatched)
499 }
500
501 pub fn assert_success(&self) {
503 if !self.is_success() {
504 panic!("ipc-rpc user message schema failed to validate, error {self:#?}");
505 }
506 }
507}
508
509#[cfg(feature = "message-schema-validation")]
525pub trait UserMessage:
526 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
527{
528}
529
530#[cfg(feature = "message-schema-validation")]
531impl<T> UserMessage for T where
532 T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
533{
534}
535
536#[cfg(not(feature = "message-schema-validation"))]
539pub trait UserMessage: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
540
541#[cfg(not(feature = "message-schema-validation"))]
542impl<T> UserMessage for T where T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
543
544#[macro_export]
588macro_rules! rpc_call {
589 (sender: $sender:expr, to_send: $to_send:expr, receiver: $received:pat_param => $to_do:block,) => {
590 $sender.send($to_send).await.map(|m| match m {
591 $received => $to_do,
592 _ => panic!("rpc_call response didn't match given pattern"),
593 })
594 };
595}
596
597#[cfg(test)]
598mod tests {
599 use tokio::time::timeout;
600
601 use super::*;
602
603 #[cfg(not(feature = "message-schema-validation"))]
604 compile_error!("Tests must be executed with all features on");
605
606 #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
607 pub struct IpcProtocolMessage {
608 pub kind: IpcProtocolMessageKind,
609 }
610
611 #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
612 pub enum IpcProtocolMessageKind {
613 TestMessage,
614 ClientTestReply,
615 ServerTestReply,
616 }
617
618 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
619 async fn basic_dialogue() {
620 let (server_key, mut server) =
621 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
622 match message.kind {
623 IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
624 kind: IpcProtocolMessageKind::ServerTestReply,
625 }),
626 _ => None,
627 }
628 })
629 .await
630 .unwrap();
631 let mut client = client::IpcRpcClient::initialize_client(
632 server_key,
633 |message: IpcProtocolMessage| async move {
634 match message.kind {
635 IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
636 kind: IpcProtocolMessageKind::ClientTestReply,
637 }),
638 _ => None,
639 }
640 },
641 )
642 .await
643 .unwrap();
644 server.schema_validated().await.unwrap().assert_success();
645 client.schema_validated().await.unwrap().assert_success();
646 let client_reply = server
647 .send(IpcProtocolMessage {
648 kind: IpcProtocolMessageKind::TestMessage,
649 })
650 .await;
651 if !matches!(
652 client_reply.as_ref().map(|r| &r.kind),
653 Ok(IpcProtocolMessageKind::ClientTestReply)
654 ) {
655 panic!("client reply was of unexpected type: {:?}", client_reply);
656 }
657 let server_reply = client
658 .send(IpcProtocolMessage {
659 kind: IpcProtocolMessageKind::TestMessage,
660 })
661 .await;
662 if !matches!(
663 server_reply.as_ref().map(|r| &r.kind),
664 Ok(IpcProtocolMessageKind::ServerTestReply)
665 ) {
666 panic!("server reply was of unexpected type: {:?}", server_reply);
667 }
668 }
669
670 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
671 async fn send_without_await() {
672 let (server_success_sender, mut server_success_receiver) = mpsc::unbounded_channel();
673 let (client_success_sender, mut client_success_receiver) = mpsc::unbounded_channel();
674 let (server_key, mut server) =
675 server::IpcRpcServer::initialize_server(move |message: IpcProtocolMessage| {
676 let server_success_sender = server_success_sender.clone();
677 async move {
678 match message.kind {
679 IpcProtocolMessageKind::TestMessage => {
680 server_success_sender.send(()).unwrap()
681 }
682 _ => {}
683 }
684 None
685 }
686 })
687 .await
688 .unwrap();
689 let mut client = client::IpcRpcClient::initialize_client(
690 server_key,
691 move |message: IpcProtocolMessage| {
692 let client_success_sender = client_success_sender.clone();
693 async move {
694 match message.kind {
695 IpcProtocolMessageKind::TestMessage => {
696 client_success_sender.send(()).unwrap()
697 }
698 _ => {}
699 }
700 None
701 }
702 },
703 )
704 .await
705 .unwrap();
706 server.schema_validated().await.unwrap().assert_success();
707 client.schema_validated().await.unwrap().assert_success();
708 let _ = server.send(IpcProtocolMessage {
709 kind: IpcProtocolMessageKind::TestMessage,
710 });
711 let _ = client.send(IpcProtocolMessage {
712 kind: IpcProtocolMessageKind::TestMessage,
713 });
714 assert_eq!(
715 timeout(Duration::from_secs(3), server_success_receiver.recv()).await,
716 Ok(Some(()))
717 );
718 assert_eq!(
719 timeout(Duration::from_secs(3), client_success_receiver.recv()).await,
720 Ok(Some(()))
721 );
722 }
723
724 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
725 async fn timeout_test() {
726 let (server_key, mut server) =
727 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
728 match message.kind {
729 _ => None,
730 }
731 })
732 .await
733 .unwrap();
734 let mut client = client::IpcRpcClient::initialize_client(
735 server_key,
736 |message: IpcProtocolMessage| async move {
737 match message.kind {
738 _ => None,
739 }
740 },
741 )
742 .await
743 .unwrap();
744 server.schema_validated().await.unwrap().assert_success();
745 client.schema_validated().await.unwrap().assert_success();
746 let wait_start = Instant::now();
747 let client_reply = server
748 .send(IpcProtocolMessage {
749 kind: IpcProtocolMessageKind::TestMessage,
750 })
751 .await;
752 assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
753 if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
754 panic!("client reply was of unexpected type: {:?}", client_reply);
755 }
756 let wait_start = Instant::now();
757 let server_reply = client
758 .send(IpcProtocolMessage {
759 kind: IpcProtocolMessageKind::TestMessage,
760 })
761 .await;
762 assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
763 if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
764 panic!("server reply was of unexpected type: {:?}", server_reply);
765 }
766 }
767
768 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
769 async fn custom_timeout_test() {
770 let (server_key, mut server) =
771 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
772 match message.kind {
773 _ => None,
774 }
775 })
776 .await
777 .unwrap();
778 let mut client = client::IpcRpcClient::initialize_client(
779 server_key,
780 |message: IpcProtocolMessage| async move {
781 match message.kind {
782 _ => None,
783 }
784 },
785 )
786 .await
787 .unwrap();
788 server.schema_validated().await.unwrap().assert_success();
789 client.schema_validated().await.unwrap().assert_success();
790 let custom_timeout: Duration = DEFAULT_REPLY_TIMEOUT / 2;
791 let wait_start = Instant::now();
792 let client_reply = server
793 .send_timeout(
794 IpcProtocolMessage {
795 kind: IpcProtocolMessageKind::TestMessage,
796 },
797 custom_timeout,
798 )
799 .await;
800 assert!(wait_start.elapsed() >= custom_timeout);
801 assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
802 if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
803 panic!("client reply was of unexpected type: {:?}", client_reply);
804 }
805 let wait_start = Instant::now();
806 let server_reply = client
807 .send_timeout(
808 IpcProtocolMessage {
809 kind: IpcProtocolMessageKind::TestMessage,
810 },
811 custom_timeout,
812 )
813 .await;
814 assert!(wait_start.elapsed() >= custom_timeout);
815 assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
816 if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
817 panic!("server reply was of unexpected type: {:?}", server_reply);
818 }
819 }
820 #[test_log::test]
821 fn server_drop_does_not_hang() {
822 let thread = std::thread::spawn(|| {
823 let runtime = tokio::runtime::Runtime::new().unwrap();
824 runtime.block_on(async {
825 let (_server_key, _server) = server::IpcRpcServer::initialize_server(
826 |message: IpcProtocolMessage| async move {
827 match message.kind {
828 _ => None,
829 }
830 },
831 )
832 .await
833 .unwrap();
834 })
835 });
836
837 let start = Instant::now();
838 let timeout = Duration::from_secs(5);
839 while !thread.is_finished() {
840 if start.elapsed() >= timeout {
841 std::process::exit(1);
843 }
844 }
845 }
846
847 #[test_log::test]
850 fn server_disconnect_test() {
851 let drop_detector = Arc::new(());
854 let drop_detector_clone = drop_detector.clone();
855 let runtime = tokio::runtime::Runtime::new().unwrap();
856 runtime.block_on(async move {
857 let (server_key, server) = server::IpcRpcServer::initialize_server({
858 let drop_detector_clone = drop_detector_clone.clone();
859 move |message: IpcProtocolMessage| {
860 let drop_detector_clone = drop_detector_clone.clone();
861 async move {
862 match message.kind {
863 _ => {
864 let _ = drop_detector_clone.clone();
867 None
868 }
869 }
870 }
871 }
872 })
873 .await
874 .unwrap();
875 let client = client::IpcRpcClient::initialize_client(
876 server_key,
877 |message: IpcProtocolMessage| async move {
878 match message.kind {
879 _ => None,
880 }
881 },
882 )
883 .await
884 .unwrap();
885 assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
886 drop(server);
887 client.wait_for_server_to_disconnect().await.unwrap();
888 });
889 let start_shutdown = Instant::now();
892 runtime.shutdown_timeout(Duration::from_secs(5));
893 assert!(start_shutdown.elapsed() < Duration::from_secs(3));
894 assert_eq!(Arc::strong_count(&drop_detector), 1);
895 }
896
897 #[test_log::test]
900 fn client_disconnect_test() {
901 use std::time::Instant;
902
903 let drop_detector = Arc::new(());
906 let drop_detector_clone = drop_detector.clone();
907 let runtime = tokio::runtime::Runtime::new().unwrap();
908 runtime.block_on(async move {
909 let (server_key, mut server) =
910 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
911 match message.kind {
912 _ => None,
913 }
914 })
915 .await
916 .unwrap();
917 let client = client::IpcRpcClient::initialize_client(server_key, {
918 let drop_detector_clone = drop_detector_clone.clone();
919 move |message: IpcProtocolMessage| {
920 let _drop_detector_clone = drop_detector_clone.clone();
921 async move {
922 match message.kind {
923 _ => None,
924 }
925 }
926 }
927 })
928 .await
929 .unwrap();
930 assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
931 drop(client);
932 server.wait_for_client_to_disconnect().await.unwrap();
933 });
934 let start_shutdown = Instant::now();
937 runtime.shutdown_timeout(Duration::from_secs(5));
938 assert!(start_shutdown.elapsed() < Duration::from_secs(3));
939 assert_eq!(Arc::strong_count(&drop_detector), 1);
940 }
941
942 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
943 async fn rpc_call_macro_test() {
944 let (server_key, mut server) =
945 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
946 match message.kind {
947 _ => Some(IpcProtocolMessage {
948 kind: IpcProtocolMessageKind::ServerTestReply,
949 }),
950 }
951 })
952 .await
953 .unwrap();
954 let mut client = client::IpcRpcClient::initialize_client(
955 server_key,
956 |message: IpcProtocolMessage| async move {
957 match message.kind {
958 _ => Some(IpcProtocolMessage {
959 kind: IpcProtocolMessageKind::ClientTestReply,
960 }),
961 }
962 },
963 )
964 .await
965 .unwrap();
966 server.schema_validated().await.unwrap().assert_success();
967 client.schema_validated().await.unwrap().assert_success();
968 rpc_call!(
969 sender: server,
970 to_send: IpcProtocolMessage {
971 kind: IpcProtocolMessageKind::TestMessage
972 },
973 receiver: IpcProtocolMessage {
974 kind: IpcProtocolMessageKind::ClientTestReply
975 } => {
976
977 },
978 )
979 .unwrap();
980 rpc_call!(
981 sender: client,
982 to_send: IpcProtocolMessage {
983 kind: IpcProtocolMessageKind::TestMessage
984 },
985 receiver: IpcProtocolMessage {
986 kind: IpcProtocolMessageKind::ServerTestReply
987 } => {
988
989 },
990 )
991 .unwrap();
992 }
993}