subxt_rpcs/client/
jsonrpsee_impl.rs1use super::{RawRpcFuture, RawRpcSubscription, RpcClientT};
6use crate::Error;
7use futures::stream::{StreamExt, TryStreamExt};
8use jsonrpsee::{
9 core::{
10 client::{Error as JsonrpseeError, Client, ClientT, SubscriptionClientT, SubscriptionKind},
11 traits::ToRpcParams,
12 },
13 types::SubscriptionId,
14};
15use serde_json::value::RawValue;
16
17pub async fn client(url: &str) -> Result<Client, Error> {
19 jsonrpsee_helpers::client(url).await.map_err(|e| Error::Client(Box::new(e)))
20}
21
22struct Params(Option<Box<RawValue>>);
23
24impl ToRpcParams for Params {
25 fn to_rpc_params(self) -> Result<Option<Box<RawValue>>, serde_json::Error> {
26 Ok(self.0)
27 }
28}
29
30impl RpcClientT for Client {
31 fn request_raw<'a>(
32 &'a self,
33 method: &'a str,
34 params: Option<Box<RawValue>>,
35 ) -> RawRpcFuture<'a, Box<RawValue>> {
36 Box::pin(async move {
37 let res = ClientT::request(self, method, Params(params)).await?;
38 Ok(res)
39 })
40 }
41
42 fn subscribe_raw<'a>(
43 &'a self,
44 sub: &'a str,
45 params: Option<Box<RawValue>>,
46 unsub: &'a str,
47 ) -> RawRpcFuture<'a, RawRpcSubscription> {
48 Box::pin(async move {
49 let stream = SubscriptionClientT::subscribe::<Box<RawValue>, _>(
50 self,
51 sub,
52 Params(params),
53 unsub,
54 ).await?;
55
56 let id = match stream.kind() {
57 SubscriptionKind::Subscription(SubscriptionId::Str(id)) => {
58 Some(id.clone().into_owned())
59 }
60 _ => None,
61 };
62
63 let stream = stream
64 .map_err(|e| Error::Client(Box::new(e)))
65 .boxed();
66 Ok(RawRpcSubscription { stream, id })
67 })
68 }
69}
70
71impl From<JsonrpseeError> for Error {
75 fn from(error: JsonrpseeError) -> Self {
76 match error {
77 JsonrpseeError::Call(e) => {
78 Error::User(crate::UserError {
79 code: e.code(),
80 message: e.message().to_owned(),
81 data: e.data().map(|d| d.to_owned())
82 })
83 },
84 e => {
85 Error::Client(Box::new(e))
86 }
87 }
88 }
89}
90
91#[cfg(all(feature = "jsonrpsee", feature = "native"))]
93mod jsonrpsee_helpers {
94 pub use jsonrpsee::{
95 client_transport::ws::{self, EitherStream, Url, WsTransportClientBuilder},
96 core::client::{Client, Error},
97 };
98 use tokio_util::compat::Compat;
99
100 pub type Sender = ws::Sender<Compat<EitherStream>>;
101 pub type Receiver = ws::Receiver<Compat<EitherStream>>;
102
103 pub async fn client(url: &str) -> Result<Client, Error> {
105 let (sender, receiver) = ws_transport(url).await?;
106 Ok(Client::builder()
107 .max_buffer_capacity_per_subscription(4096)
108 .build_with_tokio(sender, receiver))
109 }
110
111 async fn ws_transport(url: &str) -> Result<(Sender, Receiver), Error> {
112 let url = Url::parse(url).map_err(|e| Error::Transport(e.into()))?;
113 WsTransportClientBuilder::default()
114 .build(url)
115 .await
116 .map_err(|e| Error::Transport(e.into()))
117 }
118}
119
120#[cfg(all(feature = "jsonrpsee", feature = "web", target_arch = "wasm32"))]
122mod jsonrpsee_helpers {
123 pub use jsonrpsee::{
124 client_transport::web,
125 core::client::{Client, ClientBuilder, Error},
126 };
127
128 pub async fn client(url: &str) -> Result<Client, Error> {
130 let (sender, receiver) = web::connect(url)
131 .await
132 .map_err(|e| Error::Transport(e.into()))?;
133 Ok(ClientBuilder::default()
134 .max_buffer_capacity_per_subscription(4096)
135 .build_with_wasm(sender, receiver))
136 }
137}