#![doc = include_str!("../README.md")]
#[doc(hidden)]
mod cipher;
#[doc(hidden)]
mod did;
#[doc(hidden)]
mod error;
mod event;
mod jwt;
mod lib_tests;
#[doc(hidden)]
mod macros;
#[doc(hidden)]
mod metadata;
#[doc(hidden)]
pub mod prelude;
#[doc(hidden)]
mod rpc;
#[doc(hidden)]
mod serde_helpers;
#[doc(hidden)]
mod utils;
#[doc(hidden)]
mod watch;
pub use crate::error::Error as WalletConnectError;
use self::{
jwt::decode::{client_id::DecodedClientId, MessageId, ProjectId, Topic},
metadata::{Metadata, Session},
rpc::{
ErrorResponse, RequestPayload, Response, ResponseParams, SuccessfulResponse,
TAG_SESSION_PROPOSE_REQUEST, TAG_SESSION_REQUEST_REQUEST, TAG_SESSION_SETTLE_RESPONSE,
},
};
use std::{collections::HashMap, sync::Arc};
use crate::{
cipher::Cipher,
error::Error,
jwt::{
decode::sym_key::DecodedSymKey, AuthToken, SerializedAuthToken, RELAY_WEBSOCKET_ADDRESS,
},
metadata::SessionPropose,
};
use chrono::{Duration, Utc};
use ed25519_dalek::SigningKey;
use ethers::{
providers::JsonRpcError,
types::{Address, H160},
};
use futures::{
channel::mpsc::{self, UnboundedSender},
Sink, SinkExt, Stream, StreamExt,
};
use gloo_net::websocket::{futures::WebSocket, Message, WebSocketError};
use log::{debug, error};
use metadata::{Method, SessionAccount, SessionRpcRequest};
use rand::prelude::ThreadRng;
use rpc::{TAG_SESSION_DELETE_RESPONSE, TAG_SESSION_EVENT_RESPONSE, TAG_SESSION_UPDATE_RESPONSE};
use serde::{Deserialize, Serialize};
use url::Url;
use wasm_bindgen::__rt::WasmRefCell;
use x25519_dalek::{PublicKey, StaticSecret};
#[derive(Clone, Serialize, Deserialize)]
pub struct WalletConnectState {
pub state: State,
pub keys: Vec<(Topic, StaticSecret)>,
pub session: Session,
}
#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
pub enum State {
Connecting,
InitialSubscription(Topic),
SessionProposed(Topic),
SwitchingTopic(Topic),
AwaitingSettlement(Topic),
Connected(Topic),
Disconnected,
}
impl State {
pub fn is_connected(&self) -> bool {
match self {
Self::Connected(_) => true,
_ => false,
}
}
}
#[derive(Debug, Clone, Default)]
pub struct MessageIdGenerator {
next: u64,
}
impl MessageIdGenerator {
pub fn new() -> Self {
Self::default()
}
pub fn next(&self) -> MessageId {
let next = self.next;
let timestamp = chrono::Utc::now().timestamp_millis() as u64;
let id = timestamp << 8 | next;
MessageId::new(id)
}
}
#[derive(Debug, Clone)]
enum WalletConnectResponse {
Value(serde_json::Value),
Error(JsonRpcError),
}
#[derive(Clone)]
struct ClientState {
pub cipher: Cipher<ThreadRng>,
pub subscriptions: HashMap<Topic, String>,
pub pending: HashMap<MessageId, rpc::Params>,
pub requests_pending: HashMap<MessageId, UnboundedSender<WalletConnectResponse>>,
pub state: State,
pub session: Session,
}
#[derive(Clone)]
pub struct WalletConnect {
sink: Arc<WasmRefCell<dyn Sink<Message, Error = WebSocketError> + 'static + Unpin>>,
stream: Arc<WasmRefCell<dyn Stream<Item = Result<Message, WebSocketError>> + 'static + Unpin>>,
id_generator: MessageIdGenerator,
state: Arc<WasmRefCell<ClientState>>,
chain_id: u64,
}
impl WalletConnect {
pub fn connect(
project_id: ProjectId,
chain_id: u64,
metadata: Metadata,
stored_state: Option<WalletConnectState>,
) -> Result<Self, Error> {
let key = SigningKey::generate(&mut rand::thread_rng());
let auth = AuthToken::new(&metadata.url).as_jwt(&key).map_err(|_| Error::Token)?;
#[derive(Serialize)]
#[serde(rename_all = "camelCase")]
struct QueryParams<'a> {
project_id: &'a ProjectId,
auth: &'a SerializedAuthToken,
}
let query = serde_qs::to_string(&QueryParams { project_id: &project_id, auth: &auth })
.map_err(|_| Error::Query)?;
let mut url = Url::parse(RELAY_WEBSOCKET_ADDRESS).map_err(|_| Error::Url)?;
url.set_query(Some(&query));
let ws = WebSocket::open(url.as_str())?;
let (sink, stream) = ws.split();
let (keys, state, session) = match stored_state {
None => (None, State::Connecting, Session::from(metadata, chain_id)),
Some(ref s) => (Some(s.keys.clone()), s.state.clone(), s.session.clone()),
};
Ok(Self {
sink: Arc::new(WasmRefCell::new(sink)),
stream: Arc::new(WasmRefCell::new(stream)),
id_generator: MessageIdGenerator::default(),
state: Arc::new(WasmRefCell::new(ClientState {
cipher: Cipher::new(keys, ThreadRng::default()),
subscriptions: HashMap::new(),
pending: HashMap::new(),
requests_pending: HashMap::new(),
state,
session,
})),
chain_id,
})
}
pub fn get_state(&self) -> WalletConnectState {
let state = (*self.state).borrow();
WalletConnectState {
state: state.state.clone(),
keys: state.cipher.keys.clone().into_iter().collect::<Vec<_>>(),
session: state.session.clone(),
}
}
pub fn set_chain_id(&mut self, chain_id: u64) {
self.chain_id = chain_id;
}
pub async fn disconnect(&self) -> Result<(), Error> {
let mut state = (*self.state).borrow_mut();
state.cipher.clear();
state.pending.clear();
state.requests_pending.clear();
state.state = State::Disconnected;
Ok(())
}
pub fn can_send(&self) -> bool {
match self.state.borrow().session.namespace() {
Some(namespace) => namespace.methods.contains(&Method::SendTransaction),
None => false,
}
}
pub fn supports_method(&self, method: &str) -> bool {
if let Ok(method) = method.parse::<Method>() {
return match self.state.borrow().session.namespace() {
Some(namespace) => namespace.methods.contains(&method),
None => false,
};
}
false
}
pub fn get_account(&self) -> Option<H160> {
if let Some(accounts) = self.get_accounts_for_chain_id(self.chain_id()) {
if let Some(account) = accounts.iter().nth(0) {
return Some(*account);
}
}
None
}
pub fn get_accounts(&self) -> Option<Vec<SessionAccount>> {
if let Some(namespace) = self.state.borrow().session.namespace() {
return namespace.accounts.clone();
}
None
}
pub fn available_networks(&self) -> Vec<u64> {
self.state.borrow().session.available_networks()
}
pub fn get_accounts_for_chain_id(&self, chain_id: u64) -> Option<Vec<Address>> {
if let Some(namespace) = self.state.borrow().session.namespace() {
if let Some(accounts) = &namespace.accounts {
if !accounts.is_empty() {
let chain_id = metadata::Chain::Eip155(chain_id);
return Some(
accounts
.iter()
.filter_map(|acc| {
if acc.chain == chain_id {
Some(acc.account)
} else {
None
}
})
.collect::<Vec<_>>(),
);
}
}
}
None
}
pub fn chain_id(&self) -> u64 {
self.state.borrow().session.chain_id
}
pub fn address(&self) -> Address {
if let Some(account) = self.get_account() {
account
} else {
H160::zero()
}
}
pub async fn initiate_session(
&self,
initial_topics: Option<Vec<Topic>>,
) -> Result<String, Error> {
let mut result = String::new();
if let Some(topics) = initial_topics {
for topic in topics {
self.subscribe(topic).await?;
}
} else {
let topic;
let key;
{
let mut state = (*self.state).borrow_mut();
(topic, key) = state.cipher.generate();
let pub_key = PublicKey::from(&key);
state.session.proposer.public_key = DecodedClientId::from_key(&pub_key).to_hex();
}
self.subscribe(topic.clone()).await?;
{
let mut state = (*self.state).borrow_mut();
state.state = State::InitialSubscription(topic.clone());
}
result = format!(
"wc:{}@2?relay-protocol=irn&symKey={}",
topic,
DecodedSymKey::from_key(&key.to_bytes())
);
}
Ok(result)
}
pub async fn subscribe(&self, topic: Topic) -> Result<(), Error> {
self.send(&rpc::Subscribe { topic }).await?;
Ok(())
}
pub async fn next_from_stream(&self) -> Result<Response, Error> {
let mut stream = (*self.stream).borrow_mut();
match stream.next().await {
Some(Ok(Message::Bytes(_))) => Err(Error::BadResponse),
Some(Ok(Message::Text(text))) => Ok(serde_json::from_str::<Response>(&text)?),
Some(Err(err)) => {
error!("{}", err);
Err(Error::BadResponse)
}
None => Err(Error::Disconnected),
}
}
pub async fn next(&self) -> Result<Option<event::Event>, Error> {
let s = (*self.state).borrow().state.clone();
if s == State::Disconnected {
return Err(Error::Disconnected);
}
let old_chain_id = self.chain_id();
let old_accounts = self.get_accounts_for_chain_id(old_chain_id);
let was_connected = s.is_connected();
if let Ok(resp) = self.next_from_stream().await {
match resp {
Response::Success(resp) => {
_ = self.process_response(&resp).await;
}
Response::Error(err) => {
_ = self.process_error_response(&err).await;
}
Response::RPCResponse(req) => {
let handled = match self.decrypt_params(req.params).await {
Ok(_) => true,
Err(err) => {
error!("Failed to receive {err:?}");
false
}
};
_ = self.respond(req.id, handled).await;
}
}
} else {
error!("We've got disconnected");
return Ok(Some(event::Event::Broken));
}
let is_connected = (*self.state).borrow().state.is_connected();
if was_connected != is_connected {
Ok(Some(if is_connected {
event::Event::Connected
} else {
event::Event::Disconnected
}))
} else {
let new_chain_id = self.chain_id();
if old_chain_id != new_chain_id {
return Ok(Some(event::Event::ChainIdChanged(new_chain_id)));
} else {
let new_accounts = self.get_accounts_for_chain_id(new_chain_id);
if old_accounts != new_accounts {
return Ok(Some(event::Event::AccountsChanged(new_accounts)));
}
}
Ok(None)
}
}
pub async fn publish<T: rpc::SessionPayload>(
&self,
topic: &Topic,
request: &T,
ttl: Duration,
tag: u32,
prompt: bool,
) -> Result<MessageId, Error> {
let id = self.id_generator.next();
let ttl_secs = ttl.num_seconds().try_into().map_err(|_| Error::BadParam)?;
let payload = rpc::Payload::SessionRequest(rpc::SessionRequest {
id,
jsonrpc: rpc::JSON_RPC_VERSION_STR.to_string(),
params: request.clone().into_params(),
});
let req = rpc::Publish {
topic: topic.clone(),
message: (*self.state).borrow().cipher.encode(topic, &payload)?,
ttl_secs,
tag,
prompt,
};
self.send(&req).await?;
Ok(id)
}
pub async fn request(
&self,
method: &str,
params: Option<serde_json::Value>,
chain_id: u64,
) -> Result<serde_json::Value, Error> {
let topic = match &(*self.state).borrow().state {
State::Connected(ref topic) => Ok(topic.clone()),
_ => Err(Error::Disconnected),
}?;
let message_id = self
.publish(
&topic,
&SessionRpcRequest::new(method, params, chain_id),
Duration::minutes(5),
TAG_SESSION_REQUEST_REQUEST,
true,
)
.await?;
let (tx, mut rx) = mpsc::unbounded::<WalletConnectResponse>();
(*self.state).borrow_mut().requests_pending.insert(message_id, tx);
let ret = rx.next().await;
match ret {
Some(value) => match value {
WalletConnectResponse::Value(v) => Ok(v),
WalletConnectResponse::Error(error) => Err(Error::WalletError(error)),
},
None => Err(Error::BadResponse),
}
}
pub async fn wallet_respond(
&self,
topic: &Topic,
id: MessageId,
result: bool,
ttl: Duration,
tag: u32,
prompt: bool,
) -> Result<(), Error> {
let state = (*self.state).borrow().clone();
let ttl_secs = ttl.num_seconds().try_into().map_err(|_| Error::BadParam)?;
let payload = rpc::SessionResponse {
id,
jsonrpc: rpc::JSON_RPC_VERSION_STR.to_string(),
result: rpc::SessionResultParams::Boolean(result),
};
let req = rpc::Publish {
topic: topic.clone(),
message: state.cipher.encode(topic, &payload)?,
ttl_secs,
tag,
prompt,
};
self.send(&req).await?;
Ok(())
}
pub async fn send<T: RequestPayload>(&self, request: &T) -> Result<(), Error> {
let id = self.id_generator.next();
let params = request.clone().into_params();
let payload = rpc::Payload::Request(rpc::Request {
id,
jsonrpc: rpc::JSON_RPC_VERSION_STR.to_string(),
params: params.clone(),
});
let mut state = (*self.state).borrow_mut();
state.pending.insert(id, params);
let serialized_payload = serde_json::to_string(&payload)?;
(*self.sink).borrow_mut().send(Message::Text(serialized_payload)).await?;
Ok(())
}
pub async fn respond(&self, id: MessageId, success: bool) -> Result<(), Error> {
let payload = Response::Success(SuccessfulResponse {
id,
jsonrpc: rpc::JSON_RPC_VERSION_STR.to_string(),
result: serde_json::Value::Bool(success),
});
let serialized_payload = serde_json::to_string(&payload)?;
(*self.sink).borrow_mut().send(Message::Text(serialized_payload)).await?;
Ok(())
}
async fn decrypt_params(&self, params: ResponseParams) -> Result<(), Error> {
match params {
ResponseParams::Publish(payload) => {
self.consume_message(&payload.topic, &payload.message).await
}
ResponseParams::Subscription(payload) => {
self.consume_message(&payload.data.topic, &payload.data.message).await
}
}
}
async fn consume_message(&self, topic: &Topic, payload: &str) -> Result<(), Error> {
debug!(
"Received message {:?}",
(*self.state).borrow().cipher.decode_to_string(topic, payload)?
);
let request = (*self.state).borrow().cipher.decode(topic, payload)?;
match request {
rpc::SessionMessage::Error(session_error) => {
let mut state = (*self.state).borrow_mut();
match state.requests_pending.remove(&session_error.id) {
Some(mut tx) => {
_ = tx
.send(WalletConnectResponse::Error(
session_error.error.as_error_response(),
))
.await;
}
None => {}
}
Ok(())
}
rpc::SessionMessage::Response(response) => match response.result {
rpc::SessionResultParams::Responder(responder) => {
let sub_topic;
{
let mut state = (*self.state).borrow_mut();
let (new_topic, _) = state.cipher.create_common_topic(
topic,
DecodedClientId::from_hex(&responder.responder_public_key)?,
)?;
sub_topic = new_topic.clone();
state.state = State::SwitchingTopic(new_topic);
}
self.subscribe(sub_topic.clone()).await?;
Ok(())
}
rpc::SessionResultParams::Response(resp) => {
let mut state = (*self.state).borrow_mut();
if let Some(mut tx) = state.requests_pending.remove(&response.id) {
_ = tx.send(WalletConnectResponse::Value(resp)).await;
}
Ok(())
}
_ => {
debug!("Received unhandled result: {:?}", response.result);
Ok(())
}
},
rpc::SessionMessage::Message(message) => {
self.handle_message(topic, &message).await?;
Ok(())
}
}
}
async fn process_response(&self, response: &SuccessfulResponse) -> Result<(), Error> {
let mut propose_topic = None;
let mut propose: Option<SessionPropose> = None;
{
let mut state = (*self.state).borrow_mut();
let potential_params = state.pending.remove(&response.id);
if let Some(params) = potential_params {
match params {
rpc::Params::Publish(_) => {}
rpc::Params::Subscribe(sub) => {
let topic = sub.topic.clone();
let sub_hash = response.result.to_string();
state.subscriptions.insert(topic.clone(), sub_hash);
match &state.state {
State::InitialSubscription(awaiting_topic) => {
if topic == *awaiting_topic {
state.state = State::SessionProposed(topic.clone());
propose_topic = Some(topic.clone());
propose = Some(state.session.clone().into());
}
}
State::SwitchingTopic(awaiting_topic) => {
if topic == *awaiting_topic {
state.state = State::AwaitingSettlement(topic);
}
}
_ => {}
}
}
_ => {}
}
}
}
if let (Some(topic), Some(propose)) = (propose_topic, propose) {
_ = self
.publish(&topic, &propose, Duration::minutes(5), TAG_SESSION_PROPOSE_REQUEST, true)
.await?;
}
Ok(())
}
async fn process_error_response(&self, response: &ErrorResponse) -> Result<(), Error> {
debug!("Error {response:?}");
let mut state = (*self.state).borrow_mut();
if let Some(_) = state.pending.remove(&response.id) {
error!("Received error response from server {response:?}");
}
Ok(())
}
async fn handle_message(
&self,
topic: &Topic,
request: &rpc::WalletRequest,
) -> Result<(), Error> {
let s = (*self.state).borrow().state.clone();
match request.params {
rpc::WalletMessage::Ping(_) => {}
rpc::WalletMessage::Settlement(ref settlement) => {
if let State::AwaitingSettlement(settled_topic) = &s {
{
let mut state = (*self.state).borrow_mut();
state.session.settle(settlement);
state.state = State::Connected(settled_topic.clone());
let now = Utc::now();
let expires_in = state.session.expiry.unwrap() - now;
debug!(
"Session expires at {:?} that is in {:?} seconds",
state.session.expiry, expires_in
);
}
self.wallet_respond(
topic,
request.id,
true,
Duration::minutes(5),
TAG_SESSION_SETTLE_RESPONSE,
false,
)
.await?;
}
}
rpc::WalletMessage::Update(ref update) => {
{
let mut state = (*self.state).borrow_mut();
state.session.update(update);
}
debug!("Updated, responding");
self.wallet_respond(
topic,
request.id,
true,
Duration::minutes(5),
TAG_SESSION_UPDATE_RESPONSE,
false,
)
.await?;
}
rpc::WalletMessage::Event(ref event) => {
{
let mut state = (*self.state).borrow_mut();
state.session.event(event);
}
self.wallet_respond(
topic,
request.id,
true,
Duration::minutes(5),
TAG_SESSION_EVENT_RESPONSE,
false,
)
.await?;
}
rpc::WalletMessage::Delete(_) => {
{
let mut state = (*self.state).borrow_mut();
state.session.close();
state.state = State::Disconnected;
}
self.wallet_respond(
topic,
request.id,
true,
Duration::minutes(5),
TAG_SESSION_DELETE_RESPONSE,
false,
)
.await?;
}
}
Ok(())
}
}