ipc_rpc/
client.rs

1use std::{future::Future, sync::Arc};
2
3use crate::{
4    get_log_prefix, process_incoming_mail, ConnectionKey, ConnectionStatus, InternalMessage,
5    InternalMessageKind, IpcReceiveStream, IpcReplyFuture, IpcRpcError, PendingReplyEntry,
6    SchemaValidationStatus, UserMessage,
7};
8use ipc_channel::ipc::{self, IpcSender};
9use tokio::{
10    sync::{mpsc, watch},
11    time::{Duration, Instant},
12};
13use uuid::Uuid;
14
15#[cfg(feature = "message-schema-validation")]
16use schemars::{schema_for, Schema};
17
18/// Used to send messages to the connected server.
19#[derive(Debug, Clone)]
20pub struct IpcRpcClient<U: UserMessage> {
21    ipc_sender: IpcSender<InternalMessage<U>>,
22    pending_reply_sender: mpsc::UnboundedSender<PendingReplyEntry<U>>,
23    status_receiver: watch::Receiver<ConnectionStatus>,
24    #[cfg(feature = "message-schema-validation")]
25    validation_receiver: watch::Receiver<Option<SchemaValidationStatus>>,
26    log_prefix: String,
27    ref_count: Option<Arc<()>>,
28}
29
30impl<U: UserMessage> IpcRpcClient<U> {
31    /// Initializes a client connected to the server which was paired with the given [`ConnectionKey`].
32    pub async fn initialize_client<F, Fut>(
33        key: ConnectionKey,
34        message_handler: F,
35    ) -> Result<IpcRpcClient<U>, IpcRpcError>
36    where
37        F: Fn(U) -> Fut + Send + Sync + 'static,
38        Fut: Future<Output = Option<U>> + Send,
39    {
40        let ipc_sender = IpcSender::connect(key.to_string())?;
41        let (sender, receiver) = ipc::channel::<InternalMessage<U>>()?;
42        ipc_sender.send(InternalMessage {
43            uuid: Uuid::new_v4(),
44            kind: InternalMessageKind::InitConnection(sender),
45        })?;
46        let ipc_sender_clone = ipc_sender.clone();
47        let (pending_reply_sender, pending_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
48        let (status_sender, status_receiver) = watch::channel(ConnectionStatus::Connected);
49        #[cfg(feature = "message-schema-validation")]
50        let (validation_sender, validation_receiver) = watch::channel(None);
51        tokio::task::spawn(async move {
52            process_incoming_mail(
53                false,
54                pending_reply_receiver,
55                IpcReceiveStream::new(receiver),
56                message_handler,
57                ipc_sender_clone,
58                status_sender,
59            )
60            .await;
61        });
62        let log_prefix = get_log_prefix(false);
63        log::info!("{}Client initialized!", log_prefix);
64        let ret = IpcRpcClient {
65            ipc_sender,
66            pending_reply_sender,
67            status_receiver,
68            #[cfg(feature = "message-schema-validation")]
69            validation_receiver,
70            log_prefix,
71            ref_count: Some(Arc::new(())),
72        };
73        #[cfg(feature = "message-schema-validation")]
74        {
75            let reply_future = ret.internal_send(
76                InternalMessageKind::UserMessageSchema(
77                    serde_json::to_string(&schema_for!(U))
78                        .expect("upstream guarantees this won't fail"),
79                ),
80                crate::DEFAULT_REPLY_TIMEOUT,
81            );
82            tokio::spawn(async move {
83                match reply_future.await {
84                    Ok(InternalMessageKind::UserMessageSchemaOk) => {
85                        log::info!("Remote server validated user message schema");
86                        if let Err(e) =
87                            validation_sender.send(Some(SchemaValidationStatus::SchemasMatched))
88                        {
89                            log::error!("Failed to set validation_status {e:#?}");
90                        }
91                    }
92                    Ok(InternalMessageKind::UserMessageSchemaError { other_schema }) => {
93                        let my_schema = schema_for!(U);
94                        let res =
95                            validation_sender.send(Some(SchemaValidationStatus::SchemaMismatch {
96                                our_schema: serde_json::to_string(&my_schema)
97                                    .expect("upstream guarantees this won't fail"),
98                                their_schema: other_schema.clone(),
99                            }));
100                        if let Err(e) = res {
101                            log::error!("Failed to set validation_status {e:#?}");
102                        }
103                        match serde_json::from_str::<Schema>(&other_schema) {
104                            Ok(other_schema) => {
105                                if other_schema == my_schema {
106                                    log::error!("Server failed validation on user message schema, but the schemas match. This is probably a bug in ipc-rpc.");
107                                } else {
108                                    log::error!("Failed to validate that user messages have the same schema. Messages may fail to serialize and deserialize correctly. This is a serious problem.\nClient Schema {my_schema:#?}\nServer Schema {other_schema:#?}");
109                                }
110                            }
111                            Err(_) => {
112                                log::error!("Server failed validation on user schema, and we failed to deserialize incoming schema properly, got {other_schema:?}");
113                            }
114                        }
115                    }
116                    Ok(m) => {
117                        log::error!("Unexpected reply for user message schema validation {m:#?}");
118                        if let Err(e) = validation_sender
119                            .send(Some(SchemaValidationStatus::ValidationNotPerformedProperly))
120                        {
121                            log::error!("Failed to set validation_status {e:#?}");
122                        }
123                    }
124                    Err(IpcRpcError::ConnectionDropped) => {
125                        // Do nothing, connection was dropped before validation completed.
126                    }
127                    Err(e) => {
128                        log::error!("Failed to validate user message schema, messages may fail to serialize and deserialize correctly. Was the server compiled without the message-schema-validation feature? {e:#?}");
129                        if let Err(e) = validation_sender.send(Some(
130                            SchemaValidationStatus::ValidationCommunicationFailed(e),
131                        )) {
132                            log::error!("Failed to set validation_status {e:#?}");
133                        }
134                    }
135                }
136            });
137        }
138        Ok(ret)
139    }
140
141    fn internal_send(
142        &self,
143        message_kind: InternalMessageKind<U>,
144        timeout: Duration,
145    ) -> impl Future<Output = Result<InternalMessageKind<U>, IpcRpcError>> + Send + 'static {
146        let (sender, receiver) = mpsc::unbounded_channel();
147        let message = InternalMessage {
148            uuid: Uuid::new_v4(),
149            kind: message_kind,
150        };
151        if let Err(e) = self
152            .pending_reply_sender
153            .send((message.uuid, (sender, Instant::now() + timeout)))
154        {
155            log::error!("Failed to send entry for reply drop box {:?}", e);
156        }
157        let result = self.ipc_sender.send(message);
158        async move {
159            result?;
160            IpcReplyFuture { receiver }.await
161        }
162    }
163
164    /// Sends a message, waiting the given `timeout` for a reply.
165    pub fn send_timeout(
166        &self,
167        user_message: U,
168        timeout: Duration,
169    ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
170        let send_fut = self.internal_send(InternalMessageKind::UserMessage(user_message), timeout);
171        async move {
172            send_fut.await.map(|m| match m {
173                InternalMessageKind::UserMessage(m) => m,
174                _ => panic!(
175                    "Got a non-user message reply to a user message. This is a bug in ipc-rpc."
176                ),
177            })
178        }
179    }
180
181    /// Sends a message, will give up on receiving a reply after the [`DEFAULT_REPLY_TIMEOUT`](./constant.DEFAULT_REPLY_TIMEOUT.html) has passed.
182    pub fn send(
183        &self,
184        user_message: U,
185    ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
186        self.send_timeout(user_message, crate::DEFAULT_REPLY_TIMEOUT)
187    }
188
189    pub fn wait_for_server_to_disconnect(
190        &self,
191    ) -> impl Future<Output = Result<(), IpcRpcError>> + Send + 'static {
192        let mut status_receiver = self.status_receiver.clone();
193        async move {
194            // Has the session already ended?
195            if let Some(r) = status_receiver.borrow().session_end_result() {
196                return r;
197            }
198            // If not, wait for the session to end.
199            loop {
200                if status_receiver.changed().await.is_err() {
201                    return Err(IpcRpcError::ConnectionDropped);
202                }
203                if let Some(r) = status_receiver.borrow().session_end_result() {
204                    return r;
205                }
206            }
207        }
208    }
209
210    /// Returns the outcome of automatic schema validation testing. This testing is performed
211    /// on connection initiation.
212    pub async fn schema_validated(&mut self) -> Result<SchemaValidationStatus, IpcRpcError> {
213        #[cfg(not(feature = "message-schema-validation"))]
214        {
215            Ok(SchemaValidationStatus::ValidationDisabledAtCompileTime)
216        }
217        #[cfg(feature = "message-schema-validation")]
218        {
219            if self.validation_receiver.borrow_and_update().is_none() {
220                self.validation_receiver
221                    .changed()
222                    .await
223                    .map_err(|_| IpcRpcError::ConnectionDropped)?;
224            }
225            Ok(self
226                .validation_receiver
227                .borrow()
228                .as_ref()
229                .expect("the prior guaranteed this isn't empty")
230                .clone())
231        }
232    }
233}
234
235impl<U: UserMessage> Drop for IpcRpcClient<U> {
236    fn drop(&mut self) {
237        if Arc::try_unwrap(self.ref_count.take().unwrap()).is_ok() {
238            if let Err(e) = self.ipc_sender.send(InternalMessage {
239                uuid: Uuid::new_v4(),
240                kind: InternalMessageKind::Hangup,
241            }) {
242                log::error!(
243                    "{}Error sending hangup message to server: {:?}",
244                    self.log_prefix,
245                    e
246                );
247            }
248        }
249    }
250}