1use {
2 self::connection::{connection_event_loop, ConnectionControl},
3 crate::{
4 error::{ClientError, Error},
5 ConnectionOptions,
6 },
7 reown_relay_rpc::{
8 domain::{MessageId, SubscriptionId, Topic},
9 rpc::{
10 BatchFetchMessages,
11 BatchReceiveMessages,
12 BatchSubscribe,
13 BatchSubscribeBlocking,
14 BatchUnsubscribe,
15 FetchMessages,
16 Publish,
17 Receipt,
18 Subscribe,
19 SubscribeBlocking,
20 Subscription,
21 SubscriptionError,
22 Unsubscribe,
23 },
24 },
25 std::{future::Future, sync::Arc, time::Duration},
26 tokio::sync::{
27 mpsc::{self, UnboundedSender},
28 oneshot,
29 },
30};
31pub use {fetch::*, inbound::*, outbound::*, stream::*};
32#[cfg(not(target_arch = "wasm32"))]
33pub type TransportError = tokio_tungstenite::tungstenite::Error;
34#[cfg(not(target_arch = "wasm32"))]
35pub use tokio_tungstenite::tungstenite::protocol::CloseFrame;
36#[cfg(target_arch = "wasm32")]
37pub type TransportError = tokio_tungstenite_wasm::Error;
38#[cfg(target_arch = "wasm32")]
39pub use tokio_tungstenite_wasm::CloseFrame;
40
41#[derive(Debug, thiserror::Error)]
42pub enum WebsocketClientError {
43 #[error("Failed to connect: {0}")]
44 ConnectionFailed(TransportError),
45
46 #[error("Connection closed: {0}")]
47 ConnectionClosed(CloseReason),
48
49 #[error("Failed to close connection: {0}")]
50 ClosingFailed(TransportError),
51
52 #[error("Websocket transport error: {0}")]
53 Transport(TransportError),
54
55 #[error("Url error: {0}")]
56 HttpErr(http::Error),
57
58 #[error("Not connected")]
59 NotConnected,
60}
61
62#[derive(Debug, Clone)]
65pub struct CloseReason(pub Option<CloseFrame<'static>>);
66
67impl std::fmt::Display for CloseReason {
68 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
69 if let Some(frame) = &self.0 {
70 frame.fmt(f)
71 } else {
72 f.write_str("<close frame unavailable>")
73 }
74 }
75}
76
77mod connection;
78mod fetch;
79mod inbound;
80mod outbound;
81mod stream;
82
83#[derive(Debug, Clone)]
85pub struct PublishedMessage {
86 pub message_id: MessageId,
87 pub subscription_id: SubscriptionId,
88 pub topic: Topic,
89 pub message: Arc<str>,
90 pub tag: u32,
91 pub published_at: chrono::DateTime<chrono::Utc>,
92 pub received_at: chrono::DateTime<chrono::Utc>,
93}
94
95impl PublishedMessage {
96 fn from_request(request: &InboundRequest<Subscription>) -> Self {
97 let Subscription { id, data } = request.data();
98 let now = chrono::Utc::now();
99
100 Self {
101 message_id: request.id(),
102 subscription_id: id.clone(),
103 topic: data.topic.clone(),
104 message: data.message.clone(),
105 tag: data.tag,
106 published_at: now,
108 received_at: now,
109 }
110 }
111}
112
113pub trait ConnectionHandler: Send + 'static {
115 fn connected(&mut self) {}
117
118 fn disconnected(&mut self, _frame: Option<CloseFrame<'static>>) {}
120
121 fn message_received(&mut self, message: PublishedMessage);
123
124 fn inbound_error(&mut self, _error: ClientError) {}
127
128 fn outbound_error(&mut self, _error: ClientError) {}
131}
132
133type SubscriptionResult<T> = Result<T, Error<SubscriptionError>>;
134
135#[derive(Debug, Clone)]
140pub struct Client {
141 control_tx: UnboundedSender<ConnectionControl>,
142}
143
144impl Client {
145 pub fn new<T>(handler: T) -> Self
147 where
148 T: ConnectionHandler,
149 {
150 let (control_tx, control_rx) = mpsc::unbounded_channel();
151
152 let fut = connection_event_loop(control_rx, handler);
153 #[cfg(target_arch = "wasm32")]
154 wasm_bindgen_futures::spawn_local(fut);
155
156 #[cfg(not(target_arch = "wasm32"))]
157 tokio::spawn(fut);
158
159 Self { control_tx }
160 }
161
162 pub fn publish(
164 &self,
165 topic: Topic,
166 message: impl Into<Arc<str>>,
167 attestation: impl Into<Option<Arc<str>>>,
168 tag: u32,
169 ttl: Duration,
170 prompt: bool,
171 ) -> EmptyResponseFuture<Publish> {
172 let (request, response) = create_request(Publish {
173 topic,
174 message: message.into(),
175 attestation: attestation.into(),
176 ttl_secs: ttl.as_secs() as u32,
177 tag,
178 prompt,
179 });
180
181 self.request(request);
182
183 EmptyResponseFuture::new(response)
184 }
185
186 pub fn subscribe(&self, topic: Topic) -> ResponseFuture<Subscribe> {
189 let (request, response) = create_request(Subscribe { topic });
190
191 self.request(request);
192
193 response
194 }
195
196 pub fn subscribe_blocking(&self, topic: Topic) -> ResponseFuture<SubscribeBlocking> {
201 let (request, response) = create_request(SubscribeBlocking { topic });
202
203 self.request(request);
204
205 response
206 }
207
208 pub fn unsubscribe(&self, topic: Topic) -> EmptyResponseFuture<Unsubscribe> {
210 let (request, response) = create_request(Unsubscribe { topic });
211
212 self.request(request);
213
214 EmptyResponseFuture::new(response)
215 }
216
217 pub fn fetch(&self, topic: Topic) -> ResponseFuture<FetchMessages> {
219 let (request, response) = create_request(FetchMessages { topic });
220
221 self.request(request);
222
223 response
224 }
225
226 pub fn fetch_stream(&self, topics: impl Into<Vec<Topic>>) -> FetchMessageStream {
228 FetchMessageStream::new(self.clone(), topics.into())
229 }
230
231 pub fn batch_subscribe(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchSubscribe> {
234 let (request, response) = create_request(BatchSubscribe {
235 topics: topics.into(),
236 });
237
238 self.request(request);
239
240 response
241 }
242
243 pub fn batch_subscribe_blocking(
248 &self,
249 topics: impl Into<Vec<Topic>>,
250 ) -> impl Future<Output = SubscriptionResult<Vec<SubscriptionResult<SubscriptionId>>>> {
251 let (request, response) = create_request(BatchSubscribeBlocking {
252 topics: topics.into(),
253 });
254
255 self.request(request);
256
257 async move {
258 Ok(response
259 .await?
260 .into_iter()
261 .map(crate::convert_subscription_result)
262 .collect())
263 }
264 }
265
266 pub fn batch_unsubscribe(
268 &self,
269 subscriptions: impl Into<Vec<Unsubscribe>>,
270 ) -> EmptyResponseFuture<BatchUnsubscribe> {
271 let (request, response) = create_request(BatchUnsubscribe {
272 subscriptions: subscriptions.into(),
273 });
274
275 self.request(request);
276
277 EmptyResponseFuture::new(response)
278 }
279
280 pub fn batch_fetch(&self, topics: impl Into<Vec<Topic>>) -> ResponseFuture<BatchFetchMessages> {
282 let (request, response) = create_request(BatchFetchMessages {
283 topics: topics.into(),
284 });
285
286 self.request(request);
287
288 response
289 }
290
291 pub async fn batch_receive(
293 &self,
294 receipts: impl Into<Vec<Receipt>>,
295 ) -> ResponseFuture<BatchReceiveMessages> {
296 let (request, response) = create_request(BatchReceiveMessages {
297 receipts: receipts.into(),
298 });
299
300 self.request(request);
301
302 response
303 }
304
305 pub async fn connect(&self, opts: &ConnectionOptions) -> Result<(), ClientError> {
307 let (tx, rx) = oneshot::channel();
308 let request = opts.as_ws_request()?;
309
310 if self
311 .control_tx
312 .send(ConnectionControl::Connect { request, tx })
313 .is_ok()
314 {
315 rx.await.map_err(|_| ClientError::ChannelClosed)?
316 } else {
317 Err(ClientError::ChannelClosed)
318 }
319 }
320
321 pub async fn disconnect(&self) -> Result<(), ClientError> {
323 let (tx, rx) = oneshot::channel();
324
325 if self
326 .control_tx
327 .send(ConnectionControl::Disconnect { tx })
328 .is_ok()
329 {
330 rx.await.map_err(|_| ClientError::ChannelClosed)?
331 } else {
332 Err(ClientError::ChannelClosed)
333 }
334 }
335
336 pub(crate) fn request(&self, request: OutboundRequest) {
337 if let Err(err) = self
338 .control_tx
339 .send(ConnectionControl::OutboundRequest(request))
340 {
341 let ConnectionControl::OutboundRequest(request) = err.0 else {
342 unreachable!();
343 };
344
345 request.tx.send(Err(ClientError::ChannelClosed)).ok();
346 }
347 }
348}