1mod ckb;
2pub mod ckb_indexer;
3pub mod ckb_light_client;
4
5use anyhow::anyhow;
6
7#[cfg(not(target_arch = "wasm32"))]
8pub use ckb::CkbRpcClient;
9#[cfg(not(target_arch = "wasm32"))]
10pub use ckb_indexer::IndexerRpcClient;
11#[cfg(not(target_arch = "wasm32"))]
12pub use ckb_light_client::LightClientRpcClient;
13
14pub use ckb::CkbRpcAsyncClient;
15pub use ckb_indexer::IndexerRpcAsyncClient;
16use ckb_jsonrpc_types::{JsonBytes, ResponseFormat};
17pub use ckb_light_client::LightClientRpcAsyncClient;
18#[cfg(not(target_arch = "wasm32"))]
19use std::future::Future;
20use thiserror::Error;
21
22#[cfg(not(target_arch = "wasm32"))]
23pub(crate) fn block_on<F: Send>(future: impl Future<Output = F> + Send) -> F {
24 match tokio::runtime::Handle::try_current() {
25 Ok(h)
26 if matches!(
27 h.runtime_flavor(),
28 tokio::runtime::RuntimeFlavor::MultiThread
29 ) =>
30 {
31 tokio::task::block_in_place(|| h.block_on(future))
32 }
33 Ok(_) => std::thread::scope(|s| {
37 s.spawn(|| {
38 tokio::runtime::Builder::new_current_thread()
39 .enable_all()
40 .build()
41 .unwrap()
42 .block_on(future)
43 })
44 .join()
45 .unwrap()
46 }),
47 Err(_) => tokio::runtime::Builder::new_current_thread()
48 .enable_all()
49 .build()
50 .unwrap()
51 .block_on(future),
52 }
53}
54
55#[derive(Error, Debug)]
56pub enum RpcError {
57 #[error("parse json error: `{0}`")]
58 Json(#[from] serde_json::Error),
59 #[error("http error: `{0}`")]
60 Http(#[from] reqwest::Error),
61 #[error("jsonrpc error: `{0}`")]
62 Rpc(#[from] jsonrpc_core::Error),
63 #[error(transparent)]
64 Other(#[from] anyhow::Error),
65 #[error("oneshot channel closed")]
66 ChannelClosed,
67}
68
69#[cfg(not(target_arch = "wasm32"))]
70#[macro_export]
71macro_rules! jsonrpc {
72 (
73 $(#[$struct_attr:meta])*
74 pub struct $struct_name:ident {$(
75 $(#[$attr:meta])*
76 pub fn $method:ident(& $selff:ident $(, $arg_name:ident: $arg_ty:ty)*)
77 -> $return_ty:ty;
78 )*}
79 ) => (
80 $(#[$struct_attr])*
81 pub struct $struct_name {
82 pub(crate) client: $crate::rpc::RpcClient,
83 pub(crate) id: std::sync::atomic::AtomicU64,
84 }
85
86 impl Clone for $struct_name {
87 fn clone(&self) -> Self {
88 Self {
89 client: self.client.clone(),
90 id: 0.into()
91 }
92 }
93 }
94
95 impl $struct_name {
96 pub fn new(uri: &str) -> Self {
97 $struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new(uri), }
98 }
99
100 pub fn new_with_cookie(uri: &str) -> Self {
101 $struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new_with_cookie(uri), }
102 }
103
104 pub fn with_builder<F>(uri: &str, f: F) -> Result<Self, anyhow::Error>
105 where
106 F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
107 {
108 let client = $crate::rpc::RpcClient::with_builder(uri, f)?;
109 Ok($struct_name { id: 0.into(), client })
110 }
111
112 pub fn new_with_timeout(uri: &str, timeout: std::time::Duration) -> Result<Self, anyhow::Error> {
113 let client = $crate::rpc::RpcClient::new_with_timeout(uri, timeout)?;
114 Ok($struct_name { id: 0.into(), client })
115 }
116
117 pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->Result<RET, $crate::rpc::RpcError>
118 where
119 PARAM:serde::ser::Serialize + Send + 'static,
120 RET: serde::de::DeserializeOwned + Send + 'static,
121 {
122 let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
123 let params_fn = || -> Result<_,_> {
124 let params = serde_json::to_value(params)?;
125 let mut req_json = serde_json::Map::new();
126 req_json.insert("id".to_owned(), serde_json::json!(id));
127 req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0"));
128 req_json.insert("method".to_owned(), serde_json::json!(method));
129 req_json.insert("params".to_owned(), params);
130 Ok(req_json)
131 };
132
133 let task = self.client.post(params_fn);
134 $crate::rpc::block_on(task)
135
136 }
137
138 $(
139 $(#[$attr])*
140 pub fn $method(&$selff $(, $arg_name: $arg_ty)*) -> Result<$return_ty, $crate::rpc::RpcError> {
141 let method = String::from(stringify!($method));
142 let params = $crate::serialize_parameters!($($arg_name,)*);
143 let id = $selff.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
144
145 let params_fn = || -> Result<_,_> {
146 let mut req_json = serde_json::Map::new();
147 req_json.insert("id".to_owned(), serde_json::json!(id));
148 req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0"));
149 req_json.insert("method".to_owned(), serde_json::json!(method));
150 req_json.insert("params".to_owned(), params);
151 Ok(req_json)
152 };
153
154 let task = $selff.client.post(params_fn);
155 $crate::rpc::block_on(task)
156 }
157 )*
158 }
159 )
160}
161
162#[macro_export]
163macro_rules! jsonrpc_async {
164 (
165 $(#[$struct_attr:meta])*
166 pub struct $struct_name:ident {$(
167 $(#[$attr:meta])*
168 pub fn $method:ident(& $selff:ident $(, $arg_name:ident: $arg_ty:ty)*)
169 -> $return_ty:ty;
170 )*}
171 ) => (
172 $(#[$struct_attr])*
173 pub struct $struct_name {
174 pub(crate) client: $crate::rpc::RpcClient,
175 pub(crate) id: std::sync::atomic::AtomicU64,
176 }
177
178 impl Clone for $struct_name {
179 fn clone(&self) -> Self {
180 Self {
181 client: self.client.clone(),
182 id: 0.into()
183 }
184 }
185 }
186
187 impl $struct_name {
188 pub fn new(uri: &str) -> Self {
189 $struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new(uri), }
190 }
191
192 #[cfg(not(target_arch="wasm32"))]
193 pub fn new_with_cookie(uri: &str) -> Self {
194 $struct_name { id: 0.into(), client: $crate::rpc::RpcClient::new_with_cookie(uri), }
195 }
196
197 pub fn with_builder<F>(uri: &str, f: F) -> Result<Self, anyhow::Error>
198 where
199 F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
200 {
201 let client = $crate::rpc::RpcClient::with_builder(uri, f)?;
202 Ok($struct_name { id: 0.into(), client })
203 }
204
205 #[cfg(not(target_arch="wasm32"))]
206 pub fn new_with_timeout(uri: &str, timeout: std::time::Duration) -> Result<Self, anyhow::Error> {
207 let client = $crate::rpc::RpcClient::new_with_timeout(uri, timeout)?;
208 Ok($struct_name { id: 0.into(), client })
209 }
210
211 pub fn post<PARAM, RET>(&self, method:&str, params: PARAM)->impl std::future::Future<Output =Result<RET, $crate::rpc::RpcError>> + Send + 'static
212 where
213 PARAM:serde::ser::Serialize + Send + 'static,
214 RET: serde::de::DeserializeOwned + Send + 'static,
215 {
216 let id = self.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
217 let method = serde_json::json!(method);
218
219 let params_fn = move || -> Result<_,_> {
220 let params = serde_json::to_value(params)?;
221 let mut req_json = serde_json::Map::new();
222 req_json.insert("id".to_owned(), serde_json::json!(id));
223 req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0"));
224 req_json.insert("method".to_owned(), method);
225 req_json.insert("params".to_owned(), params);
226 Ok(req_json)
227 };
228
229 self.client.post(params_fn)
230
231 }
232 $(
233 $(#[$attr])*
234 pub fn $method(&$selff $(, $arg_name: $arg_ty)*) -> impl std::future::Future<Output =Result<$return_ty, $crate::rpc::RpcError>> {
235 let id = $selff.id.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
236
237 let params_fn = move || -> Result<_,_> {
238 let method = String::from(stringify!($method));
239 let params = $crate::serialize_parameters!($($arg_name,)*);
240 let mut req_json = serde_json::Map::new();
241 req_json.insert("id".to_owned(), serde_json::json!(id));
242 req_json.insert("jsonrpc".to_owned(), serde_json::json!("2.0"));
243 req_json.insert("method".to_owned(), serde_json::json!(method));
244 req_json.insert("params".to_owned(), params);
245 Ok(req_json)
246 };
247
248 $selff.client.post(params_fn)
249 }
250 )*
251 }
252 )
253}
254
255#[derive(Debug, Clone)]
256pub(crate) struct RpcClient {
257 client: reqwest::Client,
258 url: reqwest::Url,
259}
260
261impl RpcClient {
262 pub fn new(uri: &str) -> Self {
263 let url = reqwest::Url::parse(uri).expect("ckb uri, e.g. \"http://127.0.0.1:8114\"");
264 Self {
265 client: reqwest::Client::new(),
266 url,
267 }
268 }
269
270 pub fn with_builder<F>(uri: &str, f: F) -> Result<Self, anyhow::Error>
271 where
272 F: FnOnce(reqwest::ClientBuilder) -> reqwest::ClientBuilder,
273 {
274 let url = reqwest::Url::parse(uri)?;
275 let client = f(reqwest::Client::builder())
276 .build()
277 .map_err(|e| anyhow::anyhow!("Failed to build HTTP client: {}", e))?;
278 Ok(Self { client, url })
279 }
280
281 #[cfg(not(target_arch = "wasm32"))]
282 pub fn new_with_timeout(
283 uri: &str,
284 timeout: std::time::Duration,
285 ) -> Result<Self, anyhow::Error> {
286 Self::with_builder(uri, |builder| builder.timeout(timeout))
287 }
288
289 #[cfg(not(target_arch = "wasm32"))]
290 pub fn new_with_cookie(uri: &str) -> Self {
291 let url = reqwest::Url::parse(uri).expect("ckb uri, e.g. \"http://127.0.0.1:8114\"");
292 let mut client_builder = reqwest::Client::builder();
293 client_builder = client_builder.cookie_store(true);
294 Self {
295 client: client_builder
296 .build()
297 .expect("failed to build reqwest client"),
298 url,
299 }
300 }
301
302 fn post_inner<PARAM, RET, T>(
303 &self,
304 json_post_params: T,
305 ) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>>
306 where
307 PARAM: serde::ser::Serialize + Send + 'static,
308 RET: serde::de::DeserializeOwned + Send + 'static,
309 T: FnOnce() -> Result<PARAM, crate::rpc::RpcError>,
310 {
311 let url = self.url.clone();
312 let client = self.client.clone();
313
314 async move {
315 let resp = client.post(url).json(&json_post_params()?).send().await?;
316 let output = resp.json::<jsonrpc_core::response::Output>().await?;
317 match output {
318 jsonrpc_core::response::Output::Success(success) => {
319 serde_json::from_value(success.result).map_err(Into::into)
320 }
321 jsonrpc_core::response::Output::Failure(failure) => Err(failure.error.into()),
322 }
323 }
324 }
325
326 #[cfg(not(target_arch = "wasm32"))]
327 pub fn post<PARAM, RET, T>(
328 &self,
329 json_post_params: T,
330 ) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>> + Send
331 where
332 PARAM: serde::ser::Serialize + Send + 'static,
333 RET: serde::de::DeserializeOwned + Send + 'static,
334 T: FnOnce() -> Result<PARAM, crate::rpc::RpcError> + Send,
335 {
336 self.post_inner(json_post_params)
337 }
338
339 #[cfg(target_arch = "wasm32")]
340 pub fn post<PARAM, RET, T>(
341 &self,
342 json_post_params: T,
343 ) -> impl std::future::Future<Output = Result<RET, crate::rpc::RpcError>> + Send
344 where
345 PARAM: serde::ser::Serialize + Send + 'static,
346 RET: serde::de::DeserializeOwned + Send + 'static,
347 T: FnOnce() -> Result<PARAM, crate::rpc::RpcError> + Send + 'static,
348 {
349 let (tx, rx) = tokio::sync::oneshot::channel();
350 let future = self.post_inner(json_post_params);
351
352 tokio_with_wasm::spawn(async move {
353 let result = future.await;
354 let _ = tx.send(result);
355 });
356
357 async move { rx.await.map_err(|_| crate::rpc::RpcError::ChannelClosed)? }
358 }
359}
360
361#[macro_export]
362macro_rules! serialize_parameters {
363 () => ( serde_json::Value::Null );
364 ($($arg_name:ident,)+) => ( serde_json::to_value(($($arg_name,)+))?)
365}
366
367pub trait ResponseFormatGetter<V> {
368 fn get_value(self) -> Result<V, crate::rpc::RpcError>;
369 fn get_json_bytes(self) -> Result<JsonBytes, crate::rpc::RpcError>;
370}
371
372impl<V> ResponseFormatGetter<V> for ResponseFormat<V> {
373 fn get_value(self) -> Result<V, crate::rpc::RpcError> {
374 match self.inner {
375 ckb_jsonrpc_types::Either::Left(v) => Ok(v),
376 ckb_jsonrpc_types::Either::Right(_) => Err(crate::rpc::RpcError::Other(anyhow!(
377 "It's a JsonBytes, can't get the inner value directly"
378 ))),
379 }
380 }
381
382 fn get_json_bytes(self) -> Result<JsonBytes, crate::rpc::RpcError> {
383 match self.inner {
384 ckb_jsonrpc_types::Either::Left(_v) => Err(crate::rpc::RpcError::Other(anyhow!(
385 "It's not a JsonBytes, can't get the json bytes directly"
386 ))),
387 ckb_jsonrpc_types::Either::Right(json_bytes) => Ok(json_bytes),
388 }
389 }
390}
391
392#[cfg(test)]
393mod anyhow_tests {
394 use anyhow::anyhow;
395 #[test]
396 fn test_rpc_error() {
397 let json_rpc_error = jsonrpc_core::Error {
398 code: jsonrpc_core::ErrorCode::ParseError,
399 message: "parse error".to_string(),
400 data: None,
401 };
402 let error = super::RpcError::from(json_rpc_error);
403 let error = anyhow!(error);
404 println!("{}", error)
405 }
406}