reown_relay_client/
websocket.rs

1use {
2    self::connection::{connection_event_loop, ConnectionControl},
3    crate::{
4        error::{ClientError, Error},
5        ConnectionOptions,
6    },
7    reown_relay_rpc::{
8        domain::{MessageId, SubscriptionId, Topic},
9        rpc::{
10            BatchFetchMessages,
11            BatchReceiveMessages,
12            BatchSubscribe,
13            BatchSubscribeBlocking,
14            BatchUnsubscribe,
15            FetchMessages,
16            Publish,
17            Receipt,
18            Subscribe,
19            SubscribeBlocking,
20            Subscription,
21            SubscriptionError,
22            Unsubscribe,
23        },
24    },
25    std::{future::Future, sync::Arc, time::Duration},
26    tokio::sync::{
27        mpsc::{self, UnboundedSender},
28        oneshot,
29    },
30};
31pub use {fetch::*, inbound::*, outbound::*, stream::*};
32#[cfg(not(target_arch = "wasm32"))]
33pub type TransportError = tokio_tungstenite::tungstenite::Error;
34#[cfg(not(target_arch = "wasm32"))]
35pub use tokio_tungstenite::tungstenite::protocol::CloseFrame;
36#[cfg(target_arch = "wasm32")]
37pub type TransportError = tokio_tungstenite_wasm::Error;
38#[cfg(target_arch = "wasm32")]
39pub use tokio_tungstenite_wasm::CloseFrame;
40
41#[derive(Debug, thiserror::Error)]
42pub enum WebsocketClientError {
43    #[error("Failed to connect: {0}")]
44    ConnectionFailed(TransportError),
45
46    #[error("Connection closed: {0}")]
47    ConnectionClosed(CloseReason),
48
49    #[error("Failed to close connection: {0}")]
50    ClosingFailed(TransportError),
51
52    #[error("Websocket transport error: {0}")]
53    Transport(TransportError),
54
55    #[error("Url error: {0}")]
56    HttpErr(http::Error),
57
58    #[error("Not connected")]
59    NotConnected,
60}
61
62/// Wrapper around the websocket [`CloseFrame`] providing info about the
63/// connection closing reason.
64#[derive(Debug, Clone)]
65pub struct CloseReason(pub Option<CloseFrame<'static>>);
66
67impl std::fmt::Display for CloseReason {
68    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69        if let Some(frame) = &self.0 {
70            frame.fmt(f)
71        } else {
72            f.write_str("<close frame unavailable>")
73        }
74    }
75}
76
77mod connection;
78mod fetch;
79mod inbound;
80mod outbound;
81mod stream;
82
83/// The message received from a subscription.
84#[derive(Debug, Clone)]
85pub struct PublishedMessage {
86    pub message_id: MessageId,
87    pub subscription_id: SubscriptionId,
88    pub topic: Topic,
89    pub message: Arc<str>,
90    pub tag: u32,
91    pub published_at: chrono::DateTime<chrono::Utc>,
92    pub received_at: chrono::DateTime<chrono::Utc>,
93}
94
95impl PublishedMessage {
96    fn from_request(request: &InboundRequest<Subscription>) -> Self {
97        let Subscription { id, data } = request.data();
98        let now = chrono::Utc::now();
99
100        Self {
101            message_id: request.id(),
102            subscription_id: id.clone(),
103            topic: data.topic.clone(),
104            message: data.message.clone(),
105            tag: data.tag,
106            // TODO: Set proper value once implemented.
107            published_at: now,
108            received_at: now,
109        }
110    }
111}
112
113/// Handlers for the RPC stream events.
114pub trait ConnectionHandler: Send + 'static {
115    /// Called when a connection to the Relay is established.
116    fn connected(&mut self) {}
117
118    /// Called when the Relay connection is closed.
119    fn disconnected(&mut self, _frame: Option<CloseFrame<'static>>) {}
120
121    /// Called when a message is received from the Relay.
122    fn message_received(&mut self, message: PublishedMessage);
123
124    /// Called when an inbound error occurs, such as data deserialization
125    /// failure, or an unknown response message ID.
126    fn inbound_error(&mut self, _error: ClientError) {}
127
128    /// Called when an outbound error occurs, i.e. failed to write to the
129    /// websocket stream.
130    fn outbound_error(&mut self, _error: ClientError) {}
131}
132
133type SubscriptionResult<T> = Result<T, Error<SubscriptionError>>;
134
135/// The Relay WebSocket RPC client.
136///
137/// This provides the high-level access to all of the available RPC methods. For
138/// a lower-level RPC stream see [`ClientStream`](crate::client::ClientStream).
139#[derive(Debug, Clone)]
140pub struct Client {
141    control_tx: UnboundedSender<ConnectionControl>,
142}
143
144impl Client {
145    /// Creates a new [`Client`] with the provided handler.
146    pub fn new<T>(handler: T) -> Self
147    where
148        T: ConnectionHandler,
149    {
150        let (control_tx, control_rx) = mpsc::unbounded_channel();
151
152        let fut = connection_event_loop(control_rx, handler);
153        #[cfg(target_arch = "wasm32")]
154        wasm_bindgen_futures::spawn_local(fut);
155
156        #[cfg(not(target_arch = "wasm32"))]
157        tokio::spawn(fut);
158
159        Self { control_tx }
160    }
161
162    /// Publishes a message over the network on given topic.
163    pub fn publish(
164        &self,
165        topic: Topic,
166        message: impl Into<Arc<str>>,
167        attestation: impl Into<Option<Arc<str>>>,
168        tag: u32,
169        ttl: Duration,
170        prompt: bool,
171    ) -> EmptyResponseFuture<Publish> {
172        let (request, response) = create_request(Publish {
173            topic,
174            message: message.into(),
175            attestation: attestation.into(),
176            ttl_secs: ttl.as_secs() as u32,
177            tag,
178            prompt,
179        });
180
181        self.request(request);
182
183        EmptyResponseFuture::new(response)
184    }
185
186    /// Subscribes on topic to receive messages. The request is resolved
187    /// optimistically as soon as the relay receives it.
188    pub fn subscribe(&self, topic: Topic) -> ResponseFuture<Subscribe> {
189        let (request, response) = create_request(Subscribe { topic });
190
191        self.request(request);
192
193        response
194    }
195
196    /// Subscribes on topic to receive messages. The request is resolved only
197    /// when fully processed by the relay.
198    /// Note: This function is experimental and will likely be removed in the
199    /// future.
200    pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<SubscribeBlocking> {
201        let (request, response) = create_request(SubscribeBlocking { topic });
202
203        self.request(request);
204
205        response
206    }
207
208    /// Unsubscribes from a topic.
209    pub fn unsubscribe(&self, topic: Topic) -> EmptyResponseFuture<Unsubscribe> {
210        let (request, response) = create_request(Unsubscribe { topic });
211
212        self.request(request);
213
214        EmptyResponseFuture::new(response)
215    }
216
217    /// Fetch mailbox messages for a specific topic.
218    pub fn fetch(&self, topic: Topic) -> ResponseFuture<FetchMessages> {
219        let (request, response) = create_request(FetchMessages { topic });
220
221        self.request(request);
222
223        response
224    }
225
226    /// Fetch mailbox messages for a specific topic. Returns a [`Stream`].
227    pub fn fetch_stream(&self, topics: impl Into<Vec<Topic>>) -> FetchMessageStream {
228        FetchMessageStream::new(self.clone(), topics.into())
229    }
230
231    /// Subscribes on multiple topics to receive messages. The request is
232    /// resolved optimistically as soon as the relay receives it.
233    pub fn batch_subscribe(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchSubscribe> {
234        let (request, response) = create_request(BatchSubscribe {
235            topics: topics.into(),
236        });
237
238        self.request(request);
239
240        response
241    }
242
243    /// Subscribes on multiple topics to receive messages. The request is
244    /// resolved only when fully processed by the relay.
245    /// Note: This function is experimental and will likely be removed in the
246    /// future.
247    pub fn batch_subscribe_blocking(
248        &self,
249        topics: impl Into<Vec<Topic>>,
250    ) -> impl Future<Output = SubscriptionResult<Vec<SubscriptionResult<SubscriptionId>>>> {
251        let (request, response) = create_request(BatchSubscribeBlocking {
252            topics: topics.into(),
253        });
254
255        self.request(request);
256
257        async move {
258            Ok(response
259                .await?
260                .into_iter()
261                .map(crate::convert_subscription_result)
262                .collect())
263        }
264    }
265
266    /// Unsubscribes from multiple topics.
267    pub fn batch_unsubscribe(
268        &self,
269        subscriptions: impl Into<Vec<Unsubscribe>>,
270    ) -> EmptyResponseFuture<BatchUnsubscribe> {
271        let (request, response) = create_request(BatchUnsubscribe {
272            subscriptions: subscriptions.into(),
273        });
274
275        self.request(request);
276
277        EmptyResponseFuture::new(response)
278    }
279
280    /// Fetch mailbox messages for multiple topics.
281    pub fn batch_fetch(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchFetchMessages> {
282        let (request, response) = create_request(BatchFetchMessages {
283            topics: topics.into(),
284        });
285
286        self.request(request);
287
288        response
289    }
290
291    /// Acknowledge receipt of messages from a subscribed client.
292    pub async fn batch_receive(
293        &self,
294        receipts: impl Into<Vec<Receipt>>,
295    ) -> ResponseFuture<BatchReceiveMessages> {
296        let (request, response) = create_request(BatchReceiveMessages {
297            receipts: receipts.into(),
298        });
299
300        self.request(request);
301
302        response
303    }
304
305    /// Opens a connection to the Relay.
306    pub async fn connect(&self, opts: &ConnectionOptions) -> Result<(), ClientError> {
307        let (tx, rx) = oneshot::channel();
308        let request = opts.as_ws_request()?;
309
310        if self
311            .control_tx
312            .send(ConnectionControl::Connect { request, tx })
313            .is_ok()
314        {
315            rx.await.map_err(|_| ClientError::ChannelClosed)?
316        } else {
317            Err(ClientError::ChannelClosed)
318        }
319    }
320
321    /// Closes the Relay connection.
322    pub async fn disconnect(&self) -> Result<(), ClientError> {
323        let (tx, rx) = oneshot::channel();
324
325        if self
326            .control_tx
327            .send(ConnectionControl::Disconnect { tx })
328            .is_ok()
329        {
330            rx.await.map_err(|_| ClientError::ChannelClosed)?
331        } else {
332            Err(ClientError::ChannelClosed)
333        }
334    }
335
336    pub(crate) fn request(&self, request: OutboundRequest) {
337        if let Err(err) = self
338            .control_tx
339            .send(ConnectionControl::OutboundRequest(request))
340        {
341            let ConnectionControl::OutboundRequest(request) = err.0 else {
342                unreachable!();
343            };
344
345            request.tx.send(Err(ClientError::ChannelClosed)).ok();
346        }
347    }
348}