1use std::{
61 collections::HashMap,
62 convert::Infallible,
63 env,
64 ffi::OsString,
65 fmt::Debug,
66 future::Future,
67 io,
68 path::Path,
69 pin::Pin,
70 str::FromStr,
71 sync::Arc,
72 task::{Context, Poll},
73};
74
75use futures::{Stream, StreamExt};
76use ipc_channel::ipc::{IpcError, IpcReceiver, IpcSender, TryRecvError};
77use serde::{de::DeserializeOwned, Deserialize, Serialize};
78use thiserror::Error;
79use tokio::{
80 sync::{mpsc, watch},
81 time::{Duration, Instant},
82};
83use uuid::Uuid;
84
85#[cfg(feature = "message-schema-validation")]
86use schemars::{schema::RootSchema, schema_for, JsonSchema};
87
88mod client;
89pub use client::*;
90mod server;
91pub use server::*;
92
93#[derive(Deserialize, Serialize, Debug, Clone)]
95pub struct ConnectionKey(String);
96
97impl From<String> for ConnectionKey {
98 fn from(s: String) -> Self {
99 Self(s)
100 }
101}
102
103impl FromStr for ConnectionKey {
104 type Err = Infallible;
105
106 fn from_str(s: &str) -> Result<Self, Self::Err> {
107 Ok(Self(s.to_string()))
108 }
109}
110
111impl ToString for ConnectionKey {
112 fn to_string(&self) -> String {
113 self.0.clone()
114 }
115}
116
117impl From<ConnectionKey> for OsString {
118 fn from(s: ConnectionKey) -> Self {
119 OsString::from(s.0)
120 }
121}
122
123impl From<ConnectionKey> for String {
124 fn from(key: ConnectionKey) -> Self {
125 key.0
126 }
127}
128
129type PendingReplyEntry<U> = (
134 Uuid,
135 (
136 mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
137 Instant,
138 ),
139);
140#[derive(Deserialize, Serialize, Debug, Clone)]
143#[serde(bound(deserialize = ""))]
144struct InternalMessage<U: UserMessage> {
145 uuid: uuid::Uuid,
148 kind: InternalMessageKind<U>,
150}
151
152#[derive(Deserialize, Serialize, Debug, Clone)]
156#[serde(bound(deserialize = ""))]
157enum InternalMessageKind<U: UserMessage> {
158 InitConnection(IpcSender<InternalMessage<U>>),
160 Hangup,
162 UserMessage(U),
164 UserMessageSchema(String),
166 UserMessageSchemaOk,
168 UserMessageSchemaError { other_schema: String },
170}
171
172#[derive(Clone, Debug, Error)]
174pub enum IpcRpcError {
175 #[error("io error")]
176 IoError(#[from] Arc<io::Error>),
177 #[error("internal ipc channel error")]
178 IpcChannelError(#[from] Arc<ipc_channel::Error>),
179 #[error("connection initialization timed out")]
180 ConnectTimeout,
181 #[error("connection established, but initial handshake was not performed properly")]
182 HandshakeFailure,
183 #[error("client already connected")]
184 ClientAlreadyConnected,
185 #[error("peer disconnected")]
186 Disconnected,
187 #[error("time out while waiting for a reply")]
188 ReplyTimeout,
189 #[error("connection dropped pre-emptively")]
190 ConnectionDropped,
191}
192
193impl From<io::Error> for IpcRpcError {
194 fn from(e: io::Error) -> Self {
195 Self::IoError(Arc::new(e))
196 }
197}
198
199impl From<ipc_channel::Error> for IpcRpcError {
200 fn from(e: ipc_channel::Error) -> Self {
201 Self::IpcChannelError(Arc::new(e))
202 }
203}
204
205pub const DEFAULT_REPLY_TIMEOUT: Duration = Duration::from_secs(5);
216
217async fn process_incoming_mail<
220 Fut: Future<Output = Option<U>> + Send,
221 F: Fn(U) -> Fut + Send + Sync + 'static,
222 U: UserMessage,
223>(
224 is_server: bool,
225 mut pending_reply_receiver: mpsc::UnboundedReceiver<PendingReplyEntry<U>>,
226 mut receiver: IpcReceiveStream<InternalMessage<U>>,
227 message_handler: F,
228 response_sender: IpcSender<InternalMessage<U>>,
229 status_sender: watch::Sender<ConnectionStatus>,
230) {
231 let mut pending_replies = HashMap::<
232 Uuid,
233 (
234 mpsc::UnboundedSender<Result<InternalMessageKind<U>, IpcRpcError>>,
235 Instant,
236 ),
237 >::new();
238 let message_handler = Arc::new(message_handler);
239 let log_prefix = get_log_prefix(is_server);
240 log::info!("{}Processing incoming mail!", log_prefix);
241 let mut consecutive_error_count = 0;
242 let mut pending_reply_scheduled_time = Option::<Instant>::None;
243
244 let add_pending_reply = |pending_reply_scheduled_time: &mut Option<Instant>,
245 pending_replies: &mut HashMap<_, _>,
246 (key, (sender, timeout))| {
247 *pending_reply_scheduled_time = Some(
248 pending_reply_scheduled_time
249 .map(|t| t.min(timeout))
250 .unwrap_or(timeout),
251 );
252
253 pending_replies.insert(key, (sender, timeout));
254 };
255 loop {
256 while let Ok(pending_reply) = pending_reply_receiver.try_recv() {
259 add_pending_reply(
260 &mut pending_reply_scheduled_time,
261 &mut pending_replies,
262 pending_reply,
263 );
264 }
265 tokio::select! {
266 true = async { if let Some(t) = pending_reply_scheduled_time { tokio::time::sleep_until(t).await; true } else { false } } => {
267 pending_replies.retain(|_k, v| {
268 let keep = v.1 > Instant::now();
269 if !keep {
270 let _ = v.0.send(Err(IpcRpcError::ReplyTimeout));
271 }
272 keep
273 });
274 pending_reply_scheduled_time = pending_replies.values().map(|i| i.1).min();
275 }
276 pending_reply = pending_reply_receiver.recv() => {
277 match pending_reply {
278 None => {
279 break;
281 }
282 Some(pending_reply) => {
283 add_pending_reply(&mut pending_reply_scheduled_time, &mut pending_replies, pending_reply);
284 }
285 }
286 },
287 r = receiver.next() => {
288 match r {
289 None => {
290 break;
292 }
293 Some(Err(e)) => {
294 if let IpcError::Disconnected = e {
295 log::info!("{}Peer disconnected.", log_prefix);
296 break;
297 } else {
298 log::error!("{}Error receiving message from peer {:?}", log_prefix, e);
299 consecutive_error_count += 1;
300 if consecutive_error_count > 20 {
301 log::error!("{}Too many consecutive errors, shutting down.", log_prefix);
302 break;
303 }
304 }
305 }
306 Some(Ok(message)) => {
307 consecutive_error_count = 0;
308 log::debug!("{}Got message! {:?}", log_prefix, message);
309 let reply = pending_replies.remove(&message.uuid);
310 if let Some((reply_drop_box, _)) = reply {
311 log::debug!("{}It's a reply, forwarding!", log_prefix);
312 let _ = reply_drop_box.send(Ok(message.kind));
314 } else {
315 log::debug!("{}It's not a reply, handling!", log_prefix);
316 let message_uuid = message.uuid;
317 match message.kind {
318 InternalMessageKind::UserMessage(user_message) => {
319 let message_handler = Arc::clone(&message_handler);
320 let response_sender = response_sender.clone();
321 tokio::spawn(async move {
322 if let Some(m) = message_handler(user_message).await {
323 let r = response_sender.send(InternalMessage {
324 uuid: message_uuid,
325 kind: InternalMessageKind::UserMessage(m),
326 });
327 if let Err(e) = r {
328 log::error!("Failed to send reply {e:?}");
329 }
330 }
331 });
332 }
333 #[cfg(feature = "message-schema-validation")]
334 InternalMessageKind::UserMessageSchema(other_schema) => {
335 let my_schema = schema_for!(U);
336 let kind = match serde_json::from_str::<RootSchema>(&other_schema) {
337 Ok(other_schema) => {
338 if other_schema == my_schema {
339 InternalMessageKind::UserMessageSchemaOk
340 } else {
341 InternalMessageKind::UserMessageSchemaError {
342 other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
343 }
344 }
345 },
346 Err(_) => {
347 log::error!("Failed to deserialize incoming schema properly, got {other_schema:?}");
348 InternalMessageKind::UserMessageSchemaError {
349 other_schema: serde_json::to_string(&my_schema).expect("upstream guarantees this won't fail")
350 }
351 }
352 };
353 let r = response_sender.send(InternalMessage {
354 uuid: message_uuid,
355 kind,
356 });
357 if let Err(e) = r {
358 log::error!("Failed to send validation response {e:#?}");
359 }
360 }
361 InternalMessageKind::Hangup => {
362 break;
363 }
364 _ => {}
365 }
366 }
367 }
368 }
369 }
370 }
371 }
372 let _ = status_sender.send(ConnectionStatus::DisconnectedCleanly);
373}
374
375fn get_log_prefix(is_server: bool) -> String {
376 let first_arg = env::args()
377 .next()
378 .unwrap_or_else(|| String::from("Unknown"));
379 let process = Path::new(&first_arg)
380 .file_name()
381 .unwrap_or_else(|| "Unknown".as_ref())
382 .to_string_lossy();
383 if is_server {
384 format!("{} as Server: ", process)
385 } else {
386 format!("{} as Client: ", process)
387 }
388}
389
390#[derive(Clone, Debug)]
392pub enum ConnectionStatus {
393 WaitingForClient,
395 Connected,
397 DisconnectedCleanly,
399 DisconnectError(IpcRpcError),
401}
402
403impl ConnectionStatus {
404 pub fn session_end_result(&self) -> Option<Result<(), IpcRpcError>> {
405 match self {
406 ConnectionStatus::WaitingForClient | ConnectionStatus::Connected => None,
407 ConnectionStatus::DisconnectedCleanly => Some(Ok(())),
408 ConnectionStatus::DisconnectError(e) => Some(Err(e.clone())),
409 }
410 }
411}
412
413struct IpcReplyFuture<U: UserMessage> {
416 receiver: mpsc::UnboundedReceiver<Result<InternalMessageKind<U>, IpcRpcError>>,
417}
418
419impl<U: UserMessage> Future for IpcReplyFuture<U> {
420 type Output = Result<InternalMessageKind<U>, IpcRpcError>;
421
422 fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
423 self.receiver.poll_recv(cx).map(|o| match o {
424 Some(m) => m,
425 None => Err(IpcRpcError::ConnectionDropped),
426 })
427 }
428}
429
430struct IpcReceiveStream<T> {
431 receiver: mpsc::UnboundedReceiver<Result<T, IpcError>>,
432}
433
434impl<T> IpcReceiveStream<T>
435where
436 T: Send + for<'de> Deserialize<'de> + Serialize + 'static,
437{
438 pub fn new(ipc_receiver: IpcReceiver<T>) -> Self {
439 let (sender, receiver) = mpsc::unbounded_channel();
440 tokio::task::spawn_blocking(move || loop {
441 match ipc_receiver.try_recv_timeout(Duration::from_millis(250)) {
442 Ok(msg) => {
443 if sender.send(Ok(msg)).is_err() {
444 break;
445 }
446 }
447 Err(TryRecvError::IpcError(e)) => {
448 if sender.send(Err(e)).is_err() {
449 break;
450 }
451 }
452 Err(TryRecvError::Empty) => {
453 if sender.is_closed() {
454 break;
455 }
456 }
457 }
458 });
459 Self { receiver }
460 }
461}
462
463impl<T> Stream for IpcReceiveStream<T> {
464 type Item = Result<T, IpcError>;
465
466 fn poll_next(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Option<Self::Item>> {
467 self.receiver.poll_recv(cx)
468 }
469}
470
471#[derive(Debug, Clone)]
474pub enum SchemaValidationStatus {
475 ValidationDisabledAtCompileTime,
477 ValidationNotPerformedProperly,
479 ValidationCommunicationFailed(IpcRpcError),
481 SchemasMatched,
483 SchemaMismatch {
485 our_schema: String,
486 their_schema: String,
487 },
488}
489
490impl SchemaValidationStatus {
491 pub fn is_success(&self) -> bool {
493 matches!(self, SchemaValidationStatus::SchemasMatched)
494 }
495
496 pub fn assert_success(&self) {
498 if !self.is_success() {
499 panic!("ipc-rpc user message schema failed to validate, error {self:#?}");
500 }
501 }
502}
503
504#[cfg(feature = "message-schema-validation")]
520pub trait UserMessage:
521 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
522{
523}
524
525#[cfg(feature = "message-schema-validation")]
526impl<T> UserMessage for T where
527 T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize + JsonSchema
528{
529}
530
531#[cfg(not(feature = "message-schema-validation"))]
534pub trait UserMessage: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
535
536#[cfg(not(feature = "message-schema-validation"))]
537impl<T> UserMessage for T where T: 'static + Send + Debug + Clone + DeserializeOwned + Serialize {}
538
539#[macro_export]
583macro_rules! rpc_call {
584 (sender: $sender:expr, to_send: $to_send:expr, receiver: $received:pat_param => $to_do:block,) => {
585 $sender.send($to_send).await.map(|m| match m {
586 $received => $to_do,
587 _ => panic!("rpc_call response didn't match given pattern"),
588 })
589 };
590}
591
592#[cfg(test)]
593mod tests {
594 use tokio::time::timeout;
595
596 use super::*;
597
598 #[cfg(not(feature = "message-schema-validation"))]
599 compile_error!("Tests must be executed with all features on");
600
601 #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
602 pub struct IpcProtocolMessage {
603 pub kind: IpcProtocolMessageKind,
604 }
605
606 #[derive(Deserialize, Serialize, Debug, Clone, JsonSchema)]
607 pub enum IpcProtocolMessageKind {
608 TestMessage,
609 ClientTestReply,
610 ServerTestReply,
611 }
612
613 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
614 async fn basic_dialogue() {
615 let (server_key, mut server) =
616 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
617 match message.kind {
618 IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
619 kind: IpcProtocolMessageKind::ServerTestReply,
620 }),
621 _ => None,
622 }
623 })
624 .await
625 .unwrap();
626 let mut client = client::IpcRpcClient::initialize_client(
627 server_key,
628 |message: IpcProtocolMessage| async move {
629 match message.kind {
630 IpcProtocolMessageKind::TestMessage => Some(IpcProtocolMessage {
631 kind: IpcProtocolMessageKind::ClientTestReply,
632 }),
633 _ => None,
634 }
635 },
636 )
637 .await
638 .unwrap();
639 server.schema_validated().await.unwrap().assert_success();
640 client.schema_validated().await.unwrap().assert_success();
641 let client_reply = server
642 .send(IpcProtocolMessage {
643 kind: IpcProtocolMessageKind::TestMessage,
644 })
645 .await;
646 if !matches!(
647 client_reply.as_ref().map(|r| &r.kind),
648 Ok(IpcProtocolMessageKind::ClientTestReply)
649 ) {
650 panic!("client reply was of unexpected type: {:?}", client_reply);
651 }
652 let server_reply = client
653 .send(IpcProtocolMessage {
654 kind: IpcProtocolMessageKind::TestMessage,
655 })
656 .await;
657 if !matches!(
658 server_reply.as_ref().map(|r| &r.kind),
659 Ok(IpcProtocolMessageKind::ServerTestReply)
660 ) {
661 panic!("server reply was of unexpected type: {:?}", server_reply);
662 }
663 }
664
665 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
666 async fn send_without_await() {
667 let (server_success_sender, mut server_success_receiver) = mpsc::unbounded_channel();
668 let (client_success_sender, mut client_success_receiver) = mpsc::unbounded_channel();
669 let (server_key, mut server) =
670 server::IpcRpcServer::initialize_server(move |message: IpcProtocolMessage| {
671 let server_success_sender = server_success_sender.clone();
672 async move {
673 match message.kind {
674 IpcProtocolMessageKind::TestMessage => {
675 server_success_sender.send(()).unwrap()
676 }
677 _ => {}
678 }
679 None
680 }
681 })
682 .await
683 .unwrap();
684 let mut client = client::IpcRpcClient::initialize_client(
685 server_key,
686 move |message: IpcProtocolMessage| {
687 let client_success_sender = client_success_sender.clone();
688 async move {
689 match message.kind {
690 IpcProtocolMessageKind::TestMessage => {
691 client_success_sender.send(()).unwrap()
692 }
693 _ => {}
694 }
695 None
696 }
697 },
698 )
699 .await
700 .unwrap();
701 server.schema_validated().await.unwrap().assert_success();
702 client.schema_validated().await.unwrap().assert_success();
703 let _ = server.send(IpcProtocolMessage {
704 kind: IpcProtocolMessageKind::TestMessage,
705 });
706 let _ = client.send(IpcProtocolMessage {
707 kind: IpcProtocolMessageKind::TestMessage,
708 });
709 assert_eq!(
710 timeout(Duration::from_secs(3), server_success_receiver.recv()).await,
711 Ok(Some(()))
712 );
713 assert_eq!(
714 timeout(Duration::from_secs(3), client_success_receiver.recv()).await,
715 Ok(Some(()))
716 );
717 }
718
719 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
720 async fn timeout_test() {
721 let (server_key, mut server) =
722 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
723 match message.kind {
724 _ => None,
725 }
726 })
727 .await
728 .unwrap();
729 let mut client = client::IpcRpcClient::initialize_client(
730 server_key,
731 |message: IpcProtocolMessage| async move {
732 match message.kind {
733 _ => None,
734 }
735 },
736 )
737 .await
738 .unwrap();
739 server.schema_validated().await.unwrap().assert_success();
740 client.schema_validated().await.unwrap().assert_success();
741 let wait_start = Instant::now();
742 let client_reply = server
743 .send(IpcProtocolMessage {
744 kind: IpcProtocolMessageKind::TestMessage,
745 })
746 .await;
747 assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
748 if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
749 panic!("client reply was of unexpected type: {:?}", client_reply);
750 }
751 let wait_start = Instant::now();
752 let server_reply = client
753 .send(IpcProtocolMessage {
754 kind: IpcProtocolMessageKind::TestMessage,
755 })
756 .await;
757 assert!(wait_start.elapsed() >= DEFAULT_REPLY_TIMEOUT);
758 if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
759 panic!("server reply was of unexpected type: {:?}", server_reply);
760 }
761 }
762
763 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
764 async fn custom_timeout_test() {
765 let (server_key, mut server) =
766 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
767 match message.kind {
768 _ => None,
769 }
770 })
771 .await
772 .unwrap();
773 let mut client = client::IpcRpcClient::initialize_client(
774 server_key,
775 |message: IpcProtocolMessage| async move {
776 match message.kind {
777 _ => None,
778 }
779 },
780 )
781 .await
782 .unwrap();
783 server.schema_validated().await.unwrap().assert_success();
784 client.schema_validated().await.unwrap().assert_success();
785 let custom_timeout: Duration = DEFAULT_REPLY_TIMEOUT / 2;
786 let wait_start = Instant::now();
787 let client_reply = server
788 .send_timeout(
789 IpcProtocolMessage {
790 kind: IpcProtocolMessageKind::TestMessage,
791 },
792 custom_timeout,
793 )
794 .await;
795 assert!(wait_start.elapsed() >= custom_timeout);
796 assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
797 if !matches!(client_reply, Err(IpcRpcError::ReplyTimeout)) {
798 panic!("client reply was of unexpected type: {:?}", client_reply);
799 }
800 let wait_start = Instant::now();
801 let server_reply = client
802 .send_timeout(
803 IpcProtocolMessage {
804 kind: IpcProtocolMessageKind::TestMessage,
805 },
806 custom_timeout,
807 )
808 .await;
809 assert!(wait_start.elapsed() >= custom_timeout);
810 assert!(wait_start.elapsed() < DEFAULT_REPLY_TIMEOUT);
811 if !matches!(server_reply, Err(IpcRpcError::ReplyTimeout)) {
812 panic!("server reply was of unexpected type: {:?}", server_reply);
813 }
814 }
815 #[test_log::test]
816 fn server_drop_does_not_hang() {
817 let thread = std::thread::spawn(|| {
818 let runtime = tokio::runtime::Runtime::new().unwrap();
819 runtime.block_on(async {
820 let (_server_key, _server) = server::IpcRpcServer::initialize_server(
821 |message: IpcProtocolMessage| async move {
822 match message.kind {
823 _ => None,
824 }
825 },
826 )
827 .await
828 .unwrap();
829 })
830 });
831
832 let start = Instant::now();
833 let timeout = Duration::from_secs(5);
834 while !thread.is_finished() {
835 if start.elapsed() >= timeout {
836 std::process::exit(1);
838 }
839 }
840 }
841
842 #[test_log::test]
845 fn server_disconnect_test() {
846 let drop_detector = Arc::new(());
849 let drop_detector_clone = drop_detector.clone();
850 let runtime = tokio::runtime::Runtime::new().unwrap();
851 runtime.block_on(async move {
852 let (server_key, server) = server::IpcRpcServer::initialize_server({
853 let drop_detector_clone = drop_detector_clone.clone();
854 move |message: IpcProtocolMessage| {
855 let drop_detector_clone = drop_detector_clone.clone();
856 async move {
857 match message.kind {
858 _ => {
859 let _ = drop_detector_clone.clone();
862 None
863 }
864 }
865 }
866 }
867 })
868 .await
869 .unwrap();
870 let client = client::IpcRpcClient::initialize_client(
871 server_key,
872 |message: IpcProtocolMessage| async move {
873 match message.kind {
874 _ => None,
875 }
876 },
877 )
878 .await
879 .unwrap();
880 assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
881 drop(server);
882 client.wait_for_server_to_disconnect().await.unwrap();
883 });
884 let start_shutdown = Instant::now();
887 runtime.shutdown_timeout(Duration::from_secs(5));
888 assert!(start_shutdown.elapsed() < Duration::from_secs(3));
889 assert_eq!(Arc::strong_count(&drop_detector), 1);
890 }
891
892 #[test_log::test]
895 fn client_disconnect_test() {
896 use std::time::Instant;
897
898 let drop_detector = Arc::new(());
901 let drop_detector_clone = drop_detector.clone();
902 let runtime = tokio::runtime::Runtime::new().unwrap();
903 runtime.block_on(async move {
904 let (server_key, mut server) =
905 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
906 match message.kind {
907 _ => None,
908 }
909 })
910 .await
911 .unwrap();
912 let client = client::IpcRpcClient::initialize_client(server_key, {
913 let drop_detector_clone = drop_detector_clone.clone();
914 move |message: IpcProtocolMessage| {
915 let _drop_detector_clone = drop_detector_clone.clone();
916 async move {
917 match message.kind {
918 _ => None,
919 }
920 }
921 }
922 })
923 .await
924 .unwrap();
925 assert_eq!(Arc::strong_count(&drop_detector_clone), 3);
926 drop(client);
927 server.wait_for_client_to_disconnect().await.unwrap();
928 });
929 let start_shutdown = Instant::now();
932 runtime.shutdown_timeout(Duration::from_secs(5));
933 assert!(start_shutdown.elapsed() < Duration::from_secs(3));
934 assert_eq!(Arc::strong_count(&drop_detector), 1);
935 }
936
937 #[test_log::test(tokio::test(flavor = "multi_thread", worker_threads = 3))]
938 async fn rpc_call_macro_test() {
939 let (server_key, mut server) =
940 server::IpcRpcServer::initialize_server(|message: IpcProtocolMessage| async move {
941 match message.kind {
942 _ => Some(IpcProtocolMessage {
943 kind: IpcProtocolMessageKind::ServerTestReply,
944 }),
945 }
946 })
947 .await
948 .unwrap();
949 let mut client = client::IpcRpcClient::initialize_client(
950 server_key,
951 |message: IpcProtocolMessage| async move {
952 match message.kind {
953 _ => Some(IpcProtocolMessage {
954 kind: IpcProtocolMessageKind::ClientTestReply,
955 }),
956 }
957 },
958 )
959 .await
960 .unwrap();
961 server.schema_validated().await.unwrap().assert_success();
962 client.schema_validated().await.unwrap().assert_success();
963 rpc_call!(
964 sender: server,
965 to_send: IpcProtocolMessage {
966 kind: IpcProtocolMessageKind::TestMessage
967 },
968 receiver: IpcProtocolMessage {
969 kind: IpcProtocolMessageKind::ClientTestReply
970 } => {
971
972 },
973 )
974 .unwrap();
975 rpc_call!(
976 sender: client,
977 to_send: IpcProtocolMessage {
978 kind: IpcProtocolMessageKind::TestMessage
979 },
980 receiver: IpcProtocolMessage {
981 kind: IpcProtocolMessageKind::ServerTestReply
982 } => {
983
984 },
985 )
986 .unwrap();
987 }
988}