Skip to main content

ckb_sdk/rpc/
mod.rs

1mod ckb;
2pub mod ckb_indexer;
3pub mod ckb_light_client;
4
5use anyhow::anyhow;
6
7#[cfg(not(target_arch = "wasm32"))]
8pub use ckb::CkbRpcClient;
9#[cfg(not(target_arch = "wasm32"))]
10pub use ckb_indexer::IndexerRpcClient;
11#[cfg(not(target_arch = "wasm32"))]
12pub use ckb_light_client::LightClientRpcClient;
13
14pub use ckb::CkbRpcAsyncClient;
15pub use ckb_indexer::IndexerRpcAsyncClient;
16use ckb_jsonrpc_types::{JsonBytes, ResponseFormat};
17pub use ckb_light_client::LightClientRpcAsyncClient;
18#[cfg(not(target_arch = "wasm32"))]
19use std::future::Future;
20use thiserror::Error;
21
22#[cfg(not(target_arch = "wasm32"))]
23pub(crate) fn block_on<F: Send>(future: impl Future<Output = F> + Send) -> F {
24    match tokio::runtime::Handle::try_current() {
25        Ok(h)
26            if matches!(
27                h.runtime_flavor(),
28                tokio::runtime::RuntimeFlavor::MultiThread
29            ) =>
30        {
31            tokio::task::block_in_place(|| h.block_on(future))
32        }
33        // if we on the current runtime, it must use another thread to poll this future,
34        // can't block on current runtime, it will block current reactor to stop forever
35        // in tokio runtime, this time will panic
36        Ok(_) => std::thread::scope(|s| {
37            s.spawn(|| {
38                tokio::runtime::Builder::new_current_thread()
39                    .enable_all()
40                    .build()
41                    .unwrap()
42                    .block_on(future)
43            })
44            .join()
45            .unwrap()
46        }),
47        Err(_) => tokio::runtime::Builder::new_current_thread()
48            .enable_all()
49            .build()
50            .unwrap()
51            .block_on(future),
52    }
53}
54
55#[derive(Error, Debug)]
56pub enum RpcError {
57    #[error("parse json error: `{0}`")]
58    Json(#[from] serde_json::Error),
59    #[error("http error: `{0}`")]
60    Http(#[from] reqwest::Error),
61    #[error("jsonrpc error: `{0}`")]
62    Rpc(#[from] jsonrpc_core::Error),
63    #[error(transparent)]
64    Other(#[from] anyhow::Error),
65    #[error("oneshot channel closed")]
66    ChannelClosed,
67}
68
69#[cfg(not(target_arch = "wasm32"))]
70#[macro_export]
71macro_rules! jsonrpc {
72    (
73        $(#[$struct_attr:meta])*
74        pub struct $struct_name:ident {$(
75            $(#[$attr:meta])*
76            pub fn $method:ident(& $selff:ident $(, $arg_name:ident: $arg_ty:ty)*)
77                -> $return_ty:ty;
78        )*}
79    ) => (
80        $(#[$struct_attr])*
81        pub struct $struct_name {
82            pub(crate) client: $crate::rpc::RpcClient,
83            pub(crate) id: std::sync::atomic::AtomicU64,
84        }
85
86        impl Clone for $struct_name {
87            fn clone(&self) -> Self {
88                Self {
89                    client: self.client.clone(),
90                    id: 0.into()
91                }
92            }
93        }
94
95        impl $struct_name {
96            pub fn new(uri: &str) -> Self {
97                $struct_name {  id: 0.into(), client: $crate::rpc::RpcClient::new(uri), }
98            }
99
100            pub fn new_with_cookie(uri: &str) -> Self {
101                $struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new_with_cookie(uri), }
102            }
103
104            pub fn with_builder<F>(uri: &str, f: F) -> Result<Self, anyhow::Error>
105            where
106                F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
107            {
108                let client = $crate::rpc::RpcClient::with_builder(uri, f)?;
109                Ok($struct_name { id: 0.into(), client })
110            }
111
112            pub fn new_with_timeout(uri: &str, timeout: std::time::Duration) -> Result<Self, anyhow::Error> {
113                let client = $crate::rpc::RpcClient::new_with_timeout(uri, timeout)?;
114                Ok($struct_name { id: 0.into(), client })
115            }
116
117            pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->Result<RET, $crate::rpc::RpcError>
118            where
119                PARAM:serde::ser::Serialize + Send + 'static,
120                RET: serde::de::DeserializeOwned + Send + 'static,
121            {
122                let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
123                let params_fn = || -> Result<_,_> {
124                    let params = serde_json::to_value(params)?;
125                    let mut req_json = serde_json::Map::new();
126                    req_json.insert("id".to_owned(), serde_json::json!(id));
127                    req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0"));
128                    req_json.insert("method".to_owned(), serde_json::json!(method));
129                    req_json.insert("params".to_owned(), params);
130                    Ok(req_json)
131                };
132
133                let task = self.client.post(params_fn);
134                $crate::rpc::block_on(task)
135
136            }
137
138            $(
139                $(#[$attr])*
140                pub fn $method(&$selff $(, $arg_name: $arg_ty)*) -> Result<$return_ty, $crate::rpc::RpcError> {
141                    let method = String::from(stringify!($method));
142                    let params = $crate::serialize_parameters!($($arg_name,)*);
143                    let id = $selff.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
144
145                    let params_fn = || -> Result<_,_> {
146                        let mut req_json = serde_json::Map::new();
147                        req_json.insert("id".to_owned(), serde_json::json!(id));
148                        req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0"));
149                        req_json.insert("method".to_owned(), serde_json::json!(method));
150                        req_json.insert("params".to_owned(), params);
151                        Ok(req_json)
152                    };
153
154                    let task = $selff.client.post(params_fn);
155                    $crate::rpc::block_on(task)
156                }
157            )*
158        }
159    )
160}
161
162#[macro_export]
163macro_rules! jsonrpc_async {
164    (
165        $(#[$struct_attr:meta])*
166        pub struct $struct_name:ident {$(
167            $(#[$attr:meta])*
168            pub fn $method:ident(& $selff:ident $(, $arg_name:ident: $arg_ty:ty)*)
169                -> $return_ty:ty;
170        )*}
171    ) => (
172        $(#[$struct_attr])*
173        pub struct $struct_name {
174            pub(crate) client: $crate::rpc::RpcClient,
175            pub(crate) id: std::sync::atomic::AtomicU64,
176        }
177
178        impl Clone for $struct_name {
179            fn clone(&self) -> Self {
180                Self {
181                    client: self.client.clone(),
182                    id: 0.into()
183                }
184            }
185        }
186
187        impl $struct_name {
188            pub fn new(uri: &str) -> Self {
189                $struct_name {  id: 0.into(), client: $crate::rpc::RpcClient::new(uri), }
190            }
191
192            #[cfg(not(target_arch="wasm32"))]
193            pub fn new_with_cookie(uri: &str) -> Self {
194                $struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new_with_cookie(uri), }
195            }
196
197            pub fn with_builder<F>(uri: &str, f: F) -> Result<Self, anyhow::Error>
198            where
199                F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
200            {
201                let client = $crate::rpc::RpcClient::with_builder(uri, f)?;
202                Ok($struct_name { id: 0.into(), client })
203            }
204
205            #[cfg(not(target_arch="wasm32"))]
206            pub fn new_with_timeout(uri: &str, timeout: std::time::Duration) -> Result<Self, anyhow::Error> {
207                let client = $crate::rpc::RpcClient::new_with_timeout(uri, timeout)?;
208                Ok($struct_name { id: 0.into(), client })
209            }
210
211            pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->impl std::future::Future<Output =Result<RET, $crate::rpc::RpcError>> + Send + 'static
212            where
213                PARAM:serde::ser::Serialize + Send + 'static,
214                RET: serde::de::DeserializeOwned + Send + 'static,
215            {
216                let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
217                let method = serde_json::json!(method);
218
219                let params_fn = move || -> Result<_,_> {
220                    let params = serde_json::to_value(params)?;
221                    let mut req_json = serde_json::Map::new();
222                    req_json.insert("id".to_owned(), serde_json::json!(id));
223                    req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0"));
224                    req_json.insert("method".to_owned(), method);
225                    req_json.insert("params".to_owned(), params);
226                    Ok(req_json)
227                };
228
229                self.client.post(params_fn)
230
231            }
232            $(
233                $(#[$attr])*
234                pub fn $method(&$selff $(, $arg_name: $arg_ty)*) -> impl std::future::Future<Output =Result<$return_ty, $crate::rpc::RpcError>> {
235                    let id = $selff.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
236
237                    let params_fn = move || -> Result<_,_> {
238                        let method = String::from(stringify!($method));
239                        let params = $crate::serialize_parameters!($($arg_name,)*);
240                        let mut req_json = serde_json::Map::new();
241                        req_json.insert("id".to_owned(), serde_json::json!(id));
242                        req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0"));
243                        req_json.insert("method".to_owned(), serde_json::json!(method));
244                        req_json.insert("params".to_owned(), params);
245                        Ok(req_json)
246                    };
247
248                    $selff.client.post(params_fn)
249                }
250            )*
251        }
252    )
253}
254
255#[derive(Debug, Clone)]
256pub(crate) struct RpcClient {
257    client: reqwest::Client,
258    url: reqwest::Url,
259}
260
261impl RpcClient {
262    pub fn new(uri: &str) -> Self {
263        let url = reqwest::Url::parse(uri).expect("ckb uri, e.g. \"http://127.0.0.1:8114\"");
264        Self {
265            client: reqwest::Client::new(),
266            url,
267        }
268    }
269
270    pub fn with_builder<F>(uri: &str, f: F) -> Result<Self, anyhow::Error>
271    where
272        F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
273    {
274        let url = reqwest::Url::parse(uri)?;
275        let client = f(reqwest::Client::builder())
276            .build()
277            .map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))?;
278        Ok(Self { client, url })
279    }
280
281    #[cfg(not(target_arch = "wasm32"))]
282    pub fn new_with_timeout(
283        uri: &str,
284        timeout: std::time::Duration,
285    ) -> Result<Self, anyhow::Error> {
286        Self::with_builder(uri, |builder| builder.timeout(timeout))
287    }
288
289    #[cfg(not(target_arch = "wasm32"))]
290    pub fn new_with_cookie(uri: &str) -> Self {
291        let url = reqwest::Url::parse(uri).expect("ckb uri, e.g. \"http://127.0.0.1:8114\"");
292        let mut client_builder = reqwest::Client::builder();
293        client_builder = client_builder.cookie_store(true);
294        Self {
295            client: client_builder
296                .build()
297                .expect("failed to build reqwest client"),
298            url,
299        }
300    }
301
302    fn post_inner<PARAM, RET, T>(
303        &self,
304        json_post_params: T,
305    ) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>>
306    where
307        PARAM: serde::ser::Serialize + Send + 'static,
308        RET: serde::de::DeserializeOwned + Send + 'static,
309        T: FnOnce() -> Result<PARAM, crate::rpc::RpcError>,
310    {
311        let url = self.url.clone();
312        let client = self.client.clone();
313
314        async move {
315            let resp = client.post(url).json(&json_post_params()?).send().await?;
316            let output = resp.json::<jsonrpc_core::response::Output>().await?;
317            match output {
318                jsonrpc_core::response::Output::Success(success) => {
319                    serde_json::from_value(success.result).map_err(Into::into)
320                }
321                jsonrpc_core::response::Output::Failure(failure) => Err(failure.error.into()),
322            }
323        }
324    }
325
326    #[cfg(not(target_arch = "wasm32"))]
327    pub fn post<PARAM, RET, T>(
328        &self,
329        json_post_params: T,
330    ) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>> + Send
331    where
332        PARAM: serde::ser::Serialize + Send + 'static,
333        RET: serde::de::DeserializeOwned + Send + 'static,
334        T: FnOnce() -> Result<PARAM, crate::rpc::RpcError> + Send,
335    {
336        self.post_inner(json_post_params)
337    }
338
339    #[cfg(target_arch = "wasm32")]
340    pub fn post<PARAM, RET, T>(
341        &self,
342        json_post_params: T,
343    ) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>> + Send
344    where
345        PARAM: serde::ser::Serialize + Send + 'static,
346        RET: serde::de::DeserializeOwned + Send + 'static,
347        T: FnOnce() -> Result<PARAM, crate::rpc::RpcError> + Send + 'static,
348    {
349        let (tx, rx) = tokio::sync::oneshot::channel();
350        let future = self.post_inner(json_post_params);
351
352        tokio_with_wasm::spawn(async move {
353            let result = future.await;
354            let _ = tx.send(result);
355        });
356
357        async move { rx.await.map_err(|_| crate::rpc::RpcError::ChannelClosed)? }
358    }
359}
360
361#[macro_export]
362macro_rules! serialize_parameters {
363    () => ( serde_json::Value::Null );
364    ($($arg_name:ident,)+) => ( serde_json::to_value(($($arg_name,)+))?)
365}
366
367pub trait ResponseFormatGetter<V> {
368    fn get_value(self) -> Result<V, crate::rpc::RpcError>;
369    fn get_json_bytes(self) -> Result<JsonBytes, crate::rpc::RpcError>;
370}
371
372impl<V> ResponseFormatGetter<V> for ResponseFormat<V> {
373    fn get_value(self) -> Result<V, crate::rpc::RpcError> {
374        match self.inner {
375            ckb_jsonrpc_types::Either::Left(v) => Ok(v),
376            ckb_jsonrpc_types::Either::Right(_) => Err(crate::rpc::RpcError::Other(anyhow!(
377                "It's a JsonBytes, can't get the inner value directly"
378            ))),
379        }
380    }
381
382    fn get_json_bytes(self) -> Result<JsonBytes, crate::rpc::RpcError> {
383        match self.inner {
384            ckb_jsonrpc_types::Either::Left(_v) => Err(crate::rpc::RpcError::Other(anyhow!(
385                "It's not a JsonBytes, can't get the json bytes directly"
386            ))),
387            ckb_jsonrpc_types::Either::Right(json_bytes) => Ok(json_bytes),
388        }
389    }
390}
391
392#[cfg(test)]
393mod anyhow_tests {
394    use anyhow::anyhow;
395    #[test]
396    fn test_rpc_error() {
397        let json_rpc_error = jsonrpc_core::Error {
398            code: jsonrpc_core::ErrorCode::ParseError,
399            message: "parse error".to_string(),
400            data: None,
401        };
402        let error = super::RpcError::from(json_rpc_error);
403        let error = anyhow!(error);
404        println!("{}", error)
405    }
406}