1use std::{future::Future, sync::Arc};
2
3use crate::{
4 get_log_prefix, process_incoming_mail, ConnectionKey, ConnectionStatus, InternalMessage,
5 InternalMessageKind, IpcReceiveStream, IpcReplyFuture, IpcRpcError, PendingReplyEntry,
6 SchemaValidationStatus, UserMessage,
7};
8use ipc_channel::ipc::{self, IpcSender};
9use tokio::{
10 sync::{mpsc, watch},
11 time::{Duration, Instant},
12};
13use uuid::Uuid;
14
15#[cfg(feature = "message-schema-validation")]
16use schemars::{schema_for, Schema};
17
18#[derive(Debug, Clone)]
20pub struct IpcRpcClient<U: UserMessage> {
21 ipc_sender: IpcSender<InternalMessage<U>>,
22 pending_reply_sender: mpsc::UnboundedSender<PendingReplyEntry<U>>,
23 status_receiver: watch::Receiver<ConnectionStatus>,
24 #[cfg(feature = "message-schema-validation")]
25 validation_receiver: watch::Receiver<Option<SchemaValidationStatus>>,
26 log_prefix: String,
27 ref_count: Option<Arc<()>>,
28}
29
30impl<U: UserMessage> IpcRpcClient<U> {
31 pub async fn initialize_client<F, Fut>(
33 key: ConnectionKey,
34 message_handler: F,
35 ) -> Result<IpcRpcClient<U>, IpcRpcError>
36 where
37 F: Fn(U) -> Fut + Send + Sync + 'static,
38 Fut: Future<Output = Option<U>> + Send,
39 {
40 let ipc_sender = IpcSender::connect(key.to_string())?;
41 let (sender, receiver) = ipc::channel::<InternalMessage<U>>()?;
42 ipc_sender.send(InternalMessage {
43 uuid: Uuid::new_v4(),
44 kind: InternalMessageKind::InitConnection(sender),
45 })?;
46 let ipc_sender_clone = ipc_sender.clone();
47 let (pending_reply_sender, pending_reply_receiver) = tokio::sync::mpsc::unbounded_channel();
48 let (status_sender, status_receiver) = watch::channel(ConnectionStatus::Connected);
49 #[cfg(feature = "message-schema-validation")]
50 let (validation_sender, validation_receiver) = watch::channel(None);
51 tokio::task::spawn(async move {
52 process_incoming_mail(
53 false,
54 pending_reply_receiver,
55 IpcReceiveStream::new(receiver),
56 message_handler,
57 ipc_sender_clone,
58 status_sender,
59 )
60 .await;
61 });
62 let log_prefix = get_log_prefix(false);
63 log::info!("{}Client initialized!", log_prefix);
64 let ret = IpcRpcClient {
65 ipc_sender,
66 pending_reply_sender,
67 status_receiver,
68 #[cfg(feature = "message-schema-validation")]
69 validation_receiver,
70 log_prefix,
71 ref_count: Some(Arc::new(())),
72 };
73 #[cfg(feature = "message-schema-validation")]
74 {
75 let reply_future = ret.internal_send(
76 InternalMessageKind::UserMessageSchema(
77 serde_json::to_string(&schema_for!(U))
78 .expect("upstream guarantees this won't fail"),
79 ),
80 crate::DEFAULT_REPLY_TIMEOUT,
81 );
82 tokio::spawn(async move {
83 match reply_future.await {
84 Ok(InternalMessageKind::UserMessageSchemaOk) => {
85 log::info!("Remote server validated user message schema");
86 if let Err(e) =
87 validation_sender.send(Some(SchemaValidationStatus::SchemasMatched))
88 {
89 log::error!("Failed to set validation_status {e:#?}");
90 }
91 }
92 Ok(InternalMessageKind::UserMessageSchemaError { other_schema }) => {
93 let my_schema = schema_for!(U);
94 let res =
95 validation_sender.send(Some(SchemaValidationStatus::SchemaMismatch {
96 our_schema: serde_json::to_string(&my_schema)
97 .expect("upstream guarantees this won't fail"),
98 their_schema: other_schema.clone(),
99 }));
100 if let Err(e) = res {
101 log::error!("Failed to set validation_status {e:#?}");
102 }
103 match serde_json::from_str::<Schema>(&other_schema) {
104 Ok(other_schema) => {
105 if other_schema == my_schema {
106 log::error!("Server failed validation on user message schema, but the schemas match. This is probably a bug in ipc-rpc.");
107 } else {
108 log::error!("Failed to validate that user messages have the same schema. Messages may fail to serialize and deserialize correctly. This is a serious problem.\nClient Schema {my_schema:#?}\nServer Schema {other_schema:#?}");
109 }
110 }
111 Err(_) => {
112 log::error!("Server failed validation on user schema, and we failed to deserialize incoming schema properly, got {other_schema:?}");
113 }
114 }
115 }
116 Ok(m) => {
117 log::error!("Unexpected reply for user message schema validation {m:#?}");
118 if let Err(e) = validation_sender
119 .send(Some(SchemaValidationStatus::ValidationNotPerformedProperly))
120 {
121 log::error!("Failed to set validation_status {e:#?}");
122 }
123 }
124 Err(IpcRpcError::ConnectionDropped) => {
125 }
127 Err(e) => {
128 log::error!("Failed to validate user message schema, messages may fail to serialize and deserialize correctly. Was the server compiled without the message-schema-validation feature? {e:#?}");
129 if let Err(e) = validation_sender.send(Some(
130 SchemaValidationStatus::ValidationCommunicationFailed(e),
131 )) {
132 log::error!("Failed to set validation_status {e:#?}");
133 }
134 }
135 }
136 });
137 }
138 Ok(ret)
139 }
140
141 fn internal_send(
142 &self,
143 message_kind: InternalMessageKind<U>,
144 timeout: Duration,
145 ) -> impl Future<Output = Result<InternalMessageKind<U>, IpcRpcError>> + Send + 'static {
146 let (sender, receiver) = mpsc::unbounded_channel();
147 let message = InternalMessage {
148 uuid: Uuid::new_v4(),
149 kind: message_kind,
150 };
151 if let Err(e) = self
152 .pending_reply_sender
153 .send((message.uuid, (sender, Instant::now() + timeout)))
154 {
155 log::error!("Failed to send entry for reply drop box {:?}", e);
156 }
157 let result = self.ipc_sender.send(message);
158 async move {
159 result?;
160 IpcReplyFuture { receiver }.await
161 }
162 }
163
164 pub fn send_timeout(
166 &self,
167 user_message: U,
168 timeout: Duration,
169 ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
170 let send_fut = self.internal_send(InternalMessageKind::UserMessage(user_message), timeout);
171 async move {
172 send_fut.await.map(|m| match m {
173 InternalMessageKind::UserMessage(m) => m,
174 _ => panic!(
175 "Got a non-user message reply to a user message. This is a bug in ipc-rpc."
176 ),
177 })
178 }
179 }
180
181 pub fn send(
183 &self,
184 user_message: U,
185 ) -> impl Future<Output = Result<U, IpcRpcError>> + Send + 'static {
186 self.send_timeout(user_message, crate::DEFAULT_REPLY_TIMEOUT)
187 }
188
189 pub fn wait_for_server_to_disconnect(
190 &self,
191 ) -> impl Future<Output = Result<(), IpcRpcError>> + Send + 'static {
192 let mut status_receiver = self.status_receiver.clone();
193 async move {
194 if let Some(r) = status_receiver.borrow().session_end_result() {
196 return r;
197 }
198 loop {
200 if status_receiver.changed().await.is_err() {
201 return Err(IpcRpcError::ConnectionDropped);
202 }
203 if let Some(r) = status_receiver.borrow().session_end_result() {
204 return r;
205 }
206 }
207 }
208 }
209
210 pub async fn schema_validated(&mut self) -> Result<SchemaValidationStatus, IpcRpcError> {
213 #[cfg(not(feature = "message-schema-validation"))]
214 {
215 Ok(SchemaValidationStatus::ValidationDisabledAtCompileTime)
216 }
217 #[cfg(feature = "message-schema-validation")]
218 {
219 if self.validation_receiver.borrow_and_update().is_none() {
220 self.validation_receiver
221 .changed()
222 .await
223 .map_err(|_| IpcRpcError::ConnectionDropped)?;
224 }
225 Ok(self
226 .validation_receiver
227 .borrow()
228 .as_ref()
229 .expect("the prior guaranteed this isn't empty")
230 .clone())
231 }
232 }
233}
234
235impl<U: UserMessage> Drop for IpcRpcClient<U> {
236 fn drop(&mut self) {
237 if Arc::try_unwrap(self.ref_count.take().unwrap()).is_ok() {
238 if let Err(e) = self.ipc_sender.send(InternalMessage {
239 uuid: Uuid::new_v4(),
240 kind: InternalMessageKind::Hangup,
241 }) {
242 log::error!(
243 "{}Error sending hangup message to server: {:?}",
244 self.log_prefix,
245 e
246 );
247 }
248 }
249 }
250}