1#![doc = include_str!("../README.md")]
3
4#[doc(hidden)]
5mod cipher;
6#[doc(hidden)]
7mod did;
8#[doc(hidden)]
9mod error;
10mod event;
11mod jwt;
12mod lib_tests;
13#[doc(hidden)]
14mod macros;
15#[doc(hidden)]
16mod metadata;
17#[doc(hidden)]
18pub mod prelude;
19#[doc(hidden)]
20mod rpc;
21#[doc(hidden)]
22mod serde_helpers;
23#[doc(hidden)]
24mod utils;
25#[doc(hidden)]
26mod watch;
27
28pub use crate::error::Error as WalletConnectError;
29
30use self::{
31 jwt::decode::{client_id::DecodedClientId, MessageId, ProjectId, Topic},
32 metadata::{Metadata, Session},
33 rpc::{
34 ErrorResponse, RequestPayload, Response, ResponseParams, SuccessfulResponse,
35 TAG_SESSION_PROPOSE_REQUEST, TAG_SESSION_REQUEST_REQUEST, TAG_SESSION_SETTLE_RESPONSE,
36 },
37};
38use std::{collections::HashMap, sync::Arc};
39
40use crate::{
41 cipher::Cipher,
42 error::Error,
43 jwt::{
44 decode::sym_key::DecodedSymKey, AuthToken, SerializedAuthToken, RELAY_WEBSOCKET_ADDRESS,
45 },
46 metadata::SessionPropose,
47};
48use chrono::{Duration, Utc};
49use ed25519_dalek::SigningKey;
50use ethers::{
51 providers::JsonRpcError,
52 types::{Address, H160},
53};
54use futures::{
55 channel::mpsc::{self, UnboundedSender},
56 Sink, SinkExt, Stream, StreamExt,
57};
58use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError};
59use log::{debug, error};
60use metadata::{Method, SessionAccount, SessionRpcRequest};
61use rand::prelude::ThreadRng;
62use rpc::{TAG_SESSION_DELETE_RESPONSE, TAG_SESSION_EVENT_RESPONSE, TAG_SESSION_UPDATE_RESPONSE};
63use serde::{Deserialize, Serialize};
64use url::Url;
65use wasm_bindgen::__rt::WasmRefCell;
66use x25519_dalek::{PublicKey, StaticSecret};
67
68#[derive(Clone, Serialize, Deserialize)]
69pub struct WalletConnectState {
70 pub state: State,
71 pub keys: Vec<(Topic, StaticSecret)>,
72 pub session: Session,
73}
74
75#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
77pub enum State {
78 Connecting,
80 InitialSubscription(Topic),
82 SessionProposed(Topic),
84 SwitchingTopic(Topic),
86 AwaitingSettlement(Topic),
88 Connected(Topic),
90 Disconnected,
92}
93
94impl State {
95 pub fn is_connected(&self) -> bool {
96 match self {
97 Self::Connected(_) => true,
98 _ => false,
99 }
100 }
101}
102
103#[derive(Debug, Clone, Default)]
105pub struct MessageIdGenerator {
106 next: u64,
107}
108
109impl MessageIdGenerator {
110 pub fn new() -> Self {
111 Self::default()
112 }
113
114 pub fn next(&self) -> MessageId {
115 let next = self.next;
116 let timestamp = chrono::Utc::now().timestamp_millis() as u64;
117 let id = timestamp << 8 | next;
118
119 MessageId::new(id)
120 }
121}
122
123#[derive(Debug, Clone)]
124enum WalletConnectResponse {
125 Value(serde_json::Value),
126 Error(JsonRpcError),
127}
128
129#[derive(Clone)]
130struct ClientState {
131 pub cipher: Cipher<ThreadRng>,
132 pub subscriptions: HashMap<Topic, String>,
133 pub pending: HashMap<MessageId, rpc::Params>,
134 pub requests_pending: HashMap<MessageId, UnboundedSender<WalletConnectResponse>>,
135 pub state: State,
136 pub session: Session,
137}
138
139#[derive(Clone)]
141pub struct WalletConnect {
142 sink: Arc<WasmRefCell<dyn Sink<Message, Error = WebSocketError> + 'static + Unpin>>,
143 stream: Arc<WasmRefCell<dyn Stream<Item = Result<Message, WebSocketError>> + 'static + Unpin>>,
144 id_generator: MessageIdGenerator,
145 state: Arc<WasmRefCell<ClientState>>,
146 chain_id: u64,
147}
148
149impl WalletConnect {
150 pub fn connect(
152 project_id: ProjectId,
153 chain_id: u64,
154 metadata: Metadata,
155 stored_state: Option<WalletConnectState>,
156 ) -> Result<Self, Error> {
157 let key = SigningKey::generate(&mut rand::thread_rng());
158 let auth = AuthToken::new(&metadata.url).as_jwt(&key).map_err(|_| Error::Token)?;
159
160 #[derive(Serialize)]
161 #[serde(rename_all = "camelCase")]
162 struct QueryParams<'a> {
163 project_id: &'a ProjectId,
164 auth: &'a SerializedAuthToken,
165 }
166
167 let query = serde_qs::to_string(&QueryParams { project_id: &project_id, auth: &auth })
168 .map_err(|_| Error::Query)?;
169
170 let mut url = Url::parse(RELAY_WEBSOCKET_ADDRESS).map_err(|_| Error::Url)?;
171 url.set_query(Some(&query));
172
173 let ws = WebSocket::open(url.as_str())?;
174 let (sink, stream) = ws.split();
175
176 let (keys, state, session) = match stored_state {
177 None => (None, State::Connecting, Session::from(metadata, chain_id)),
178 Some(ref s) => (Some(s.keys.clone()), s.state.clone(), s.session.clone()),
179 };
180
181 Ok(Self {
182 sink: Arc::new(WasmRefCell::new(sink)),
183 stream: Arc::new(WasmRefCell::new(stream)),
184 id_generator: MessageIdGenerator::default(),
185 state: Arc::new(WasmRefCell::new(ClientState {
186 cipher: Cipher::new(keys, ThreadRng::default()),
187 subscriptions: HashMap::new(),
188 pending: HashMap::new(),
189 requests_pending: HashMap::new(),
190 state,
191 session,
192 })),
193 chain_id,
194 })
195 }
196
197 pub fn get_state(&self) -> WalletConnectState {
199 let state = (*self.state).borrow();
200 WalletConnectState {
201 state: state.state.clone(),
202 keys: state.cipher.keys.clone().into_iter().collect::<Vec<_>>(),
203 session: state.session.clone(),
204 }
205 }
206
207 pub fn set_chain_id(&mut self, chain_id: u64) {
209 self.chain_id = chain_id;
210 }
211
212 pub async fn disconnect(&self) -> Result<(), Error> {
214 let mut state = (*self.state).borrow_mut();
216 state.cipher.clear();
217 state.pending.clear();
218 state.requests_pending.clear();
219
220 state.state = State::Disconnected;
225
226 Ok(())
227 }
228
229 pub fn can_send(&self) -> bool {
232 match self.state.borrow().session.namespace() {
233 Some(namespace) => namespace.methods.contains(&Method::SendTransaction),
234 None => false,
235 }
236 }
237
238 pub fn supports_method(&self, method: &str) -> bool {
240 if let Ok(method) = method.parse::<Method>() {
241 return match self.state.borrow().session.namespace() {
242 Some(namespace) => namespace.methods.contains(&method),
243 None => false,
244 };
245 }
246
247 false
248 }
249
250 pub fn get_account(&self) -> Option<H160> {
252 if let Some(accounts) = self.get_accounts_for_chain_id(self.chain_id()) {
253 if let Some(account) = accounts.iter().nth(0) {
254 return Some(*account);
255 }
256 }
257 None
258 }
259
260 pub fn get_accounts(&self) -> Option<Vec<SessionAccount>> {
262 if let Some(namespace) = self.state.borrow().session.namespace() {
263 return namespace.accounts.clone();
264 }
265 None
266 }
267
268 pub fn available_networks(&self) -> Vec<u64> {
270 self.state.borrow().session.available_networks()
271 }
272
273 pub fn get_accounts_for_chain_id(&self, chain_id: u64) -> Option<Vec<Address>> {
276 if let Some(namespace) = self.state.borrow().session.namespace() {
277 if let Some(accounts) = &namespace.accounts {
278 if !accounts.is_empty() {
279 let chain_id = metadata::Chain::Eip155(chain_id);
280 return Some(
281 accounts
282 .iter()
283 .filter_map(|acc| {
284 if acc.chain == chain_id {
285 Some(acc.account)
286 } else {
287 None
288 }
289 })
290 .collect::<Vec<_>>(),
291 );
292 }
293 }
294 }
295 None
296 }
297
298 pub fn chain_id(&self) -> u64 {
300 self.state.borrow().session.chain_id
301 }
302
303 pub fn address(&self) -> Address {
305 if let Some(account) = self.get_account() {
306 account
307 } else {
308 H160::zero()
309 }
310 }
311
312 pub async fn initiate_session(
314 &self,
315 initial_topics: Option<Vec<Topic>>,
316 ) -> Result<String, Error> {
317 let mut result = String::new();
318 if let Some(topics) = initial_topics {
319 for topic in topics {
320 self.subscribe(topic).await?;
321 }
322 } else {
323 let topic;
324 let key;
325 {
326 let mut state = (*self.state).borrow_mut();
327 (topic, key) = state.cipher.generate();
328 let pub_key = PublicKey::from(&key);
329 state.session.proposer.public_key = DecodedClientId::from_key(&pub_key).to_hex();
330 }
331 self.subscribe(topic.clone()).await?;
332 {
333 let mut state = (*self.state).borrow_mut();
334 state.state = State::InitialSubscription(topic.clone());
335 }
336 result = format!(
337 "wc:{}@2?relay-protocol=irn&symKey={}",
338 topic,
339 DecodedSymKey::from_key(&key.to_bytes())
340 );
341 }
342
343 Ok(result)
344 }
345
346 pub async fn subscribe(&self, topic: Topic) -> Result<(), Error> {
348 self.send(&rpc::Subscribe { topic }).await?;
349 Ok(())
350 }
351
352 pub async fn next_from_stream(&self) -> Result<Response, Error> {
354 let mut stream = (*self.stream).borrow_mut();
355 match stream.next().await {
356 Some(Ok(Message::Bytes(_))) => Err(Error::BadResponse),
357 Some(Ok(Message::Text(text))) => Ok(serde_json::from_str::<Response>(&text)?),
358 Some(Err(err)) => {
359 error!("{}", err);
360 Err(Error::BadResponse)
361 }
362
363 None => Err(Error::Disconnected),
364 }
365 }
366
367 pub async fn next(&self) -> Result<Option<event::Event>, Error> {
368 let s = (*self.state).borrow().state.clone();
369 if s == State::Disconnected {
370 return Err(Error::Disconnected);
371 }
372
373 let old_chain_id = self.chain_id();
374 let old_accounts = self.get_accounts_for_chain_id(old_chain_id);
375 let was_connected = s.is_connected();
376 if let Ok(resp) = self.next_from_stream().await {
377 match resp {
378 Response::Success(resp) => {
379 _ = self.process_response(&resp).await;
380 }
381 Response::Error(err) => {
382 _ = self.process_error_response(&err).await;
383 }
384 Response::RPCResponse(req) => {
385 let handled = match self.decrypt_params(req.params).await {
386 Ok(_) => true,
387 Err(err) => {
388 error!("Failed to receive {err:?}");
389 false
390 }
391 };
392 _ = self.respond(req.id, handled).await;
393 }
394 }
395 } else {
396 error!("We've got disconnected");
397 return Ok(Some(event::Event::Broken));
398 }
399
400 let is_connected = (*self.state).borrow().state.is_connected();
401 if was_connected != is_connected {
402 Ok(Some(if is_connected {
403 event::Event::Connected
404 } else {
405 event::Event::Disconnected
406 }))
407 } else {
408 let new_chain_id = self.chain_id();
410 if old_chain_id != new_chain_id {
411 return Ok(Some(event::Event::ChainIdChanged(new_chain_id)));
412 } else {
413 let new_accounts = self.get_accounts_for_chain_id(new_chain_id);
414 if old_accounts != new_accounts {
415 return Ok(Some(event::Event::AccountsChanged(new_accounts)));
416 }
417 }
418 Ok(None)
419 }
420 }
421
422 pub async fn publish<T: rpc::SessionPayload>(
424 &self,
425 topic: &Topic,
426 request: &T,
427 ttl: Duration,
428 tag: u32,
429 prompt: bool,
430 ) -> Result<MessageId, Error> {
431 let id = self.id_generator.next();
432 let ttl_secs = ttl.num_seconds().try_into().map_err(|_| Error::BadParam)?;
433 let payload = rpc::Payload::SessionRequest(rpc::SessionRequest {
434 id,
435 jsonrpc: rpc::JSON_RPC_VERSION_STR.to_string(),
436 params: request.clone().into_params(),
437 });
438 let req = rpc::Publish {
439 topic: topic.clone(),
440 message: (*self.state).borrow().cipher.encode(topic, &payload)?,
441 ttl_secs,
442 tag,
443 prompt,
444 };
445 self.send(&req).await?;
446 Ok(id)
447 }
448
449 pub async fn request(
451 &self,
452 method: &str,
453 params: Option<serde_json::Value>,
454 chain_id: u64,
455 ) -> Result<serde_json::Value, Error> {
456 let topic = match &(*self.state).borrow().state {
457 State::Connected(ref topic) => Ok(topic.clone()),
458 _ => Err(Error::Disconnected),
459 }?;
460 let message_id = self
461 .publish(
462 &topic,
463 &SessionRpcRequest::new(method, params, chain_id),
464 Duration::minutes(5),
465 TAG_SESSION_REQUEST_REQUEST,
466 true,
467 )
468 .await?;
469
470 let (tx, mut rx) = mpsc::unbounded::<WalletConnectResponse>();
471 (*self.state).borrow_mut().requests_pending.insert(message_id, tx);
472
473 let ret = rx.next().await;
474 match ret {
475 Some(value) => match value {
476 WalletConnectResponse::Value(v) => Ok(v),
477 WalletConnectResponse::Error(error) => Err(Error::WalletError(error)),
478 },
479 None => Err(Error::BadResponse),
480 }
481 }
482
483 pub async fn wallet_respond(
485 &self,
486 topic: &Topic,
487 id: MessageId,
488 result: bool,
489 ttl: Duration,
490 tag: u32,
491 prompt: bool,
492 ) -> Result<(), Error> {
493 let state = (*self.state).borrow().clone();
494 let ttl_secs = ttl.num_seconds().try_into().map_err(|_| Error::BadParam)?;
495 let payload = rpc::SessionResponse {
496 id,
497 jsonrpc: rpc::JSON_RPC_VERSION_STR.to_string(),
498 result: rpc::SessionResultParams::Boolean(result),
499 };
500 let req = rpc::Publish {
501 topic: topic.clone(),
502 message: state.cipher.encode(topic, &payload)?,
503 ttl_secs,
504 tag,
505 prompt,
506 };
507 self.send(&req).await?;
508 Ok(())
509 }
510
511 pub async fn send<T: RequestPayload>(&self, request: &T) -> Result<(), Error> {
513 let id = self.id_generator.next();
514 let params = request.clone().into_params();
515 let payload = rpc::Payload::Request(rpc::Request {
516 id,
517 jsonrpc: rpc::JSON_RPC_VERSION_STR.to_string(),
518 params: params.clone(),
519 });
520 let mut state = (*self.state).borrow_mut();
521 state.pending.insert(id, params);
522 let serialized_payload = serde_json::to_string(&payload)?;
523 (*self.sink).borrow_mut().send(Message::Text(serialized_payload)).await?;
524 Ok(())
525 }
526
527 pub async fn respond(&self, id: MessageId, success: bool) -> Result<(), Error> {
529 let payload = Response::Success(SuccessfulResponse {
530 id,
531 jsonrpc: rpc::JSON_RPC_VERSION_STR.to_string(),
532 result: serde_json::Value::Bool(success),
533 });
534 let serialized_payload = serde_json::to_string(&payload)?;
535 (*self.sink).borrow_mut().send(Message::Text(serialized_payload)).await?;
536 Ok(())
537 }
538
539 async fn decrypt_params(&self, params: ResponseParams) -> Result<(), Error> {
540 match params {
541 ResponseParams::Publish(payload) => {
542 self.consume_message(&payload.topic, &payload.message).await
543 }
544 ResponseParams::Subscription(payload) => {
545 self.consume_message(&payload.data.topic, &payload.data.message).await
546 }
547 }
548 }
549
550 async fn consume_message(&self, topic: &Topic, payload: &str) -> Result<(), Error> {
551 debug!(
552 "Received message {:?}",
553 (*self.state).borrow().cipher.decode_to_string(topic, payload)?
554 );
555 let request = (*self.state).borrow().cipher.decode(topic, payload)?;
556
557 match request {
558 rpc::SessionMessage::Error(session_error) => {
559 let mut state = (*self.state).borrow_mut();
560 match state.requests_pending.remove(&session_error.id) {
561 Some(mut tx) => {
562 _ = tx
563 .send(WalletConnectResponse::Error(
564 session_error.error.as_error_response(),
565 ))
566 .await;
567 }
568 None => {}
569 }
570 Ok(())
571 }
572 rpc::SessionMessage::Response(response) => match response.result {
573 rpc::SessionResultParams::Responder(responder) => {
574 let sub_topic;
575 {
576 let mut state = (*self.state).borrow_mut();
577 let (new_topic, _) = state.cipher.create_common_topic(
578 topic,
579 DecodedClientId::from_hex(&responder.responder_public_key)?,
580 )?;
581 sub_topic = new_topic.clone();
582 state.state = State::SwitchingTopic(new_topic);
583 }
584 self.subscribe(sub_topic.clone()).await?;
585 Ok(())
586 }
587 rpc::SessionResultParams::Response(resp) => {
588 let mut state = (*self.state).borrow_mut();
589 if let Some(mut tx) = state.requests_pending.remove(&response.id) {
590 _ = tx.send(WalletConnectResponse::Value(resp)).await;
591 }
592
593 Ok(())
594 }
595 _ => {
596 debug!("Received unhandled result: {:?}", response.result);
597 Ok(())
598 }
599 },
600 rpc::SessionMessage::Message(message) => {
601 self.handle_message(topic, &message).await?;
602 Ok(())
603 }
604 }
605 }
606
607 async fn process_response(&self, response: &SuccessfulResponse) -> Result<(), Error> {
608 let mut propose_topic = None;
609 let mut propose: Option<SessionPropose> = None;
610 {
611 let mut state = (*self.state).borrow_mut();
612 let potential_params = state.pending.remove(&response.id);
614 if let Some(params) = potential_params {
615 match params {
616 rpc::Params::Publish(_) => {}
617 rpc::Params::Subscribe(sub) => {
618 let topic = sub.topic.clone();
619 let sub_hash = response.result.to_string();
620 state.subscriptions.insert(topic.clone(), sub_hash);
621 match &state.state {
622 State::InitialSubscription(awaiting_topic) => {
623 if topic == *awaiting_topic {
624 state.state = State::SessionProposed(topic.clone());
625 propose_topic = Some(topic.clone());
626 propose = Some(state.session.clone().into());
627 }
628 }
629 State::SwitchingTopic(awaiting_topic) => {
630 if topic == *awaiting_topic {
631 state.state = State::AwaitingSettlement(topic);
632 }
633 }
634 _ => {}
635 }
636 }
637 _ => {}
638 }
639 }
640 }
641
642 if let (Some(topic), Some(propose)) = (propose_topic, propose) {
643 _ = self
644 .publish(&topic, &propose, Duration::minutes(5), TAG_SESSION_PROPOSE_REQUEST, true)
645 .await?;
646 }
647 Ok(())
648 }
649
650 async fn process_error_response(&self, response: &ErrorResponse) -> Result<(), Error> {
651 debug!("Error {response:?}");
652 let mut state = (*self.state).borrow_mut();
653 if let Some(_) = state.pending.remove(&response.id) {
654 error!("Received error response from server {response:?}");
655
656 }
658 Ok(())
659 }
660
661 async fn handle_message(
662 &self,
663 topic: &Topic,
664 request: &rpc::WalletRequest,
665 ) -> Result<(), Error> {
666 let s = (*self.state).borrow().state.clone();
667 match request.params {
668 rpc::WalletMessage::Ping(_) => {}
669 rpc::WalletMessage::Settlement(ref settlement) => {
670 if let State::AwaitingSettlement(settled_topic) = &s {
671 {
672 let mut state = (*self.state).borrow_mut();
673
674 state.session.settle(settlement);
675 state.state = State::Connected(settled_topic.clone());
676 let now = Utc::now();
677 let expires_in = state.session.expiry.unwrap() - now;
678 debug!(
680 "Session expires at {:?} that is in {:?} seconds",
681 state.session.expiry, expires_in
682 );
683 }
684 self.wallet_respond(
686 topic,
687 request.id,
688 true,
689 Duration::minutes(5),
690 TAG_SESSION_SETTLE_RESPONSE,
691 false,
692 )
693 .await?;
694 }
695 }
696 rpc::WalletMessage::Update(ref update) => {
697 {
698 let mut state = (*self.state).borrow_mut();
699 state.session.update(update);
700 }
701 debug!("Updated, responding");
702 self.wallet_respond(
703 topic,
704 request.id,
705 true,
706 Duration::minutes(5),
707 TAG_SESSION_UPDATE_RESPONSE,
708 false,
709 )
710 .await?;
711 }
712 rpc::WalletMessage::Event(ref event) => {
713 {
714 let mut state = (*self.state).borrow_mut();
715 state.session.event(event);
716 }
717 self.wallet_respond(
718 topic,
719 request.id,
720 true,
721 Duration::minutes(5),
722 TAG_SESSION_EVENT_RESPONSE,
723 false,
724 )
725 .await?;
726 }
727 rpc::WalletMessage::Delete(_) => {
728 {
729 let mut state = (*self.state).borrow_mut();
730 state.session.close();
731 state.state = State::Disconnected;
732 }
733 self.wallet_respond(
734 topic,
735 request.id,
736 true,
737 Duration::minutes(5),
738 TAG_SESSION_DELETE_RESPONSE,
739 false,
740 )
741 .await?;
742 }
743 }
744 Ok(())
745 }
746}