1use crate::{
49 error::rpc::RpcClientError,
50 strings::rpc::{methods, storage_keys},
51};
52use scale::{Decode, Encode};
53use std::sync::Arc;
54use subxt::{
55 Metadata, SubstrateConfig,
56 backend::{
57 legacy::{LegacyRpcMethods, rpc_methods::Block},
58 rpc::RpcClient,
59 },
60 config::substrate::H256,
61};
62use tokio::sync::{Mutex, RwLock, Semaphore};
63use url::Url;
64
65const WS_CONNECT_TIMEOUT_SECS: u64 = 30;
70
71const MAX_CONCURRENT_UPSTREAM_CALLS: usize = 4;
77
78const METADATA_V14: u32 = 14;
80const METADATA_LATEST: u32 = 15;
82
83#[derive(Clone)]
100pub struct ForkRpcClient {
101 legacy: Arc<RwLock<LegacyRpcMethods<SubstrateConfig>>>,
102 endpoint: Url,
103 upstream_semaphore: Arc<Semaphore>,
105 reconnect_lock: Arc<Mutex<()>>,
110}
111
112impl std::fmt::Debug for ForkRpcClient {
113 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114 f.debug_struct("ForkRpcClient").field("endpoint", &self.endpoint).finish()
115 }
116}
117
118impl ForkRpcClient {
119 pub async fn connect(endpoint: &Url) -> Result<Self, RpcClientError> {
129 let legacy = Self::create_connection(endpoint).await?;
130 Ok(Self {
131 legacy: Arc::new(RwLock::new(legacy)),
132 endpoint: endpoint.clone(),
133 upstream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_UPSTREAM_CALLS)),
134 reconnect_lock: Arc::new(Mutex::new(())),
135 })
136 }
137
138 async fn create_connection(
144 endpoint: &Url,
145 ) -> Result<LegacyRpcMethods<SubstrateConfig>, RpcClientError> {
146 use jsonrpsee::ws_client::WsClientBuilder;
147
148 let client = WsClientBuilder::default()
149 .max_response_size(u32::MAX)
150 .connection_timeout(std::time::Duration::from_secs(WS_CONNECT_TIMEOUT_SECS))
151 .build(endpoint.as_str())
152 .await
153 .map_err(|e| RpcClientError::ConnectionFailed {
154 endpoint: endpoint.to_string(),
155 message: e.to_string(),
156 })?;
157 let rpc_client = RpcClient::new(client);
158 Ok(LegacyRpcMethods::new(rpc_client))
159 }
160
161 pub async fn reconnect(&self) -> Result<(), RpcClientError> {
170 let _guard = self.reconnect_lock.lock().await;
171
172 if self.legacy.read().await.system_chain().await.is_ok() {
175 return Ok(());
176 }
177
178 let new_legacy = Self::create_connection(&self.endpoint).await?;
179 *self.legacy.write().await = new_legacy;
180 Ok(())
181 }
182
183 pub fn endpoint(&self) -> &Url {
185 &self.endpoint
186 }
187
188 pub async fn finalized_head(&self) -> Result<H256, RpcClientError> {
193 self.legacy.read().await.chain_get_finalized_head().await.map_err(|e| {
194 RpcClientError::RequestFailed {
195 method: methods::CHAIN_GET_FINALIZED_HEAD,
196 message: e.to_string(),
197 }
198 })
199 }
200
201 pub async fn header(
206 &self,
207 hash: H256,
208 ) -> Result<<SubstrateConfig as subxt::Config>::Header, RpcClientError> {
209 self.legacy
210 .read()
211 .await
212 .chain_get_header(Some(hash))
213 .await
214 .map_err(|e| RpcClientError::RequestFailed {
215 method: methods::CHAIN_GET_HEADER,
216 message: e.to_string(),
217 })?
218 .ok_or_else(|| RpcClientError::InvalidResponse(format!("No header found for {hash:?}")))
219 }
220
221 pub async fn block_hash_at(&self, block_number: u32) -> Result<Option<H256>, RpcClientError> {
231 self.legacy
232 .read()
233 .await
234 .chain_get_block_hash(Some(block_number.into()))
235 .await
236 .map_err(|e| RpcClientError::RequestFailed {
237 method: methods::CHAIN_GET_BLOCK_HASH,
238 message: e.to_string(),
239 })
240 }
241
242 pub async fn block_by_number(
255 &self,
256 block_number: u32,
257 ) -> Result<Option<(H256, Block<SubstrateConfig>)>, RpcClientError> {
258 let block_hash = self.block_hash_at(block_number).await?;
260
261 let block_hash = match block_hash {
262 Some(hash) => hash,
263 None => return Ok(None),
264 };
265
266 let block =
268 self.legacy.read().await.chain_get_block(Some(block_hash)).await.map_err(|e| {
269 RpcClientError::RequestFailed {
270 method: methods::CHAIN_GET_BLOCK,
271 message: e.to_string(),
272 }
273 })?;
274
275 Ok(block.map(|block| (block_hash, block.block)))
276 }
277
278 pub async fn block_by_hash(
288 &self,
289 block_hash: H256,
290 ) -> Result<Option<Block<SubstrateConfig>>, RpcClientError> {
291 let block =
292 self.legacy.read().await.chain_get_block(Some(block_hash)).await.map_err(|e| {
293 RpcClientError::RequestFailed {
294 method: methods::CHAIN_GET_BLOCK,
295 message: e.to_string(),
296 }
297 })?;
298
299 Ok(block.map(|b| b.block))
300 }
301
302 pub async fn storage(&self, key: &[u8], at: H256) -> Result<Option<Vec<u8>>, RpcClientError> {
313 self.legacy.read().await.state_get_storage(key, Some(at)).await.map_err(|e| {
314 RpcClientError::RequestFailed {
315 method: methods::STATE_GET_STORAGE,
316 message: e.to_string(),
317 }
318 })
319 }
320
321 pub async fn storage_batch(
333 &self,
334 keys: &[&[u8]],
335 at: H256,
336 ) -> Result<Vec<Option<Vec<u8>>>, RpcClientError> {
337 if keys.is_empty() {
338 return Ok(vec![]);
339 }
340
341 let _permit = self.upstream_semaphore.acquire().await.expect("semaphore closed");
342
343 let result = self
344 .legacy
345 .read()
346 .await
347 .state_query_storage_at(keys.iter().copied(), Some(at))
348 .await
349 .map_err(|e| RpcClientError::RequestFailed {
350 method: methods::STATE_QUERY_STORAGE_AT,
351 message: e.to_string(),
352 })?;
353
354 let changes: std::collections::HashMap<Vec<u8>, Option<Vec<u8>>> = result
356 .into_iter()
357 .flat_map(|change_set| {
358 change_set.changes.into_iter().map(|(k, v)| {
359 let key_bytes = k.0.to_vec();
360 let value_bytes = v.map(|v| v.0.to_vec());
361 (key_bytes, value_bytes)
362 })
363 })
364 .collect();
365
366 let values = keys.iter().map(|key| changes.get::<[u8]>(key).cloned().flatten()).collect();
368
369 Ok(values)
370 }
371
372 pub async fn storage_keys_paged(
382 &self,
383 prefix: &[u8],
384 count: u32,
385 start_key: Option<&[u8]>,
386 at: H256,
387 ) -> Result<Vec<Vec<u8>>, RpcClientError> {
388 let _permit = self.upstream_semaphore.acquire().await.expect("semaphore closed");
389
390 self.legacy
391 .read()
392 .await
393 .state_get_keys_paged(prefix, count, start_key, Some(at))
394 .await
395 .map_err(|e| RpcClientError::RequestFailed {
396 method: methods::STATE_GET_KEYS_PAGED,
397 message: e.to_string(),
398 })
399 }
400
401 pub async fn metadata(&self, at: H256) -> Result<Metadata, RpcClientError> {
408 let raw = self.legacy.read().await.state_get_metadata(Some(at)).await.map_err(|e| {
409 RpcClientError::RequestFailed {
410 method: methods::STATE_GET_METADATA,
411 message: e.to_string(),
412 }
413 })?;
414
415 let raw_bytes = raw.into_raw();
416 match Metadata::decode(&mut raw_bytes.as_slice()) {
417 Ok(metadata) => Ok(metadata),
418 Err(default_err) => {
419 for version in (METADATA_V14..=METADATA_LATEST).rev() {
421 if let Some(bytes) = self.metadata_at_version(version, at).await? &&
422 let Ok(metadata) = Metadata::decode(&mut bytes.as_slice())
423 {
424 return Ok(metadata);
425 }
426 }
427 Err(RpcClientError::MetadataDecodingFailed(default_err.to_string()))
428 },
429 }
430 }
431
432 async fn metadata_at_version(
438 &self,
439 version: u32,
440 at: H256,
441 ) -> Result<Option<Vec<u8>>, RpcClientError> {
442 let result = self
443 .legacy
444 .read()
445 .await
446 .state_call("Metadata_metadata_at_version", Some(&version.encode()), Some(at))
447 .await
448 .map_err(|e| RpcClientError::RequestFailed {
449 method: methods::STATE_CALL,
450 message: e.to_string(),
451 })?;
452
453 let opaque: Option<Vec<u8>> = Decode::decode(&mut result.as_slice()).map_err(|e| {
456 RpcClientError::InvalidResponse(format!(
457 "Failed to decode metadata_at_version response: {e}"
458 ))
459 })?;
460
461 Ok(opaque)
462 }
463
464 pub async fn runtime_code(&self, at: H256) -> Result<Vec<u8>, RpcClientError> {
468 let code_key = sp_core::storage::well_known_keys::CODE;
470
471 self.storage(code_key, at)
472 .await?
473 .ok_or_else(|| RpcClientError::StorageNotFound(storage_keys::CODE.to_string()))
474 }
475
476 pub async fn system_chain(&self) -> Result<String, RpcClientError> {
478 self.legacy
479 .read()
480 .await
481 .system_chain()
482 .await
483 .map_err(|e| RpcClientError::RequestFailed {
484 method: methods::SYSTEM_CHAIN,
485 message: e.to_string(),
486 })
487 }
488
489 pub async fn state_call(
495 &self,
496 function: &str,
497 call_parameters: &[u8],
498 at: Option<H256>,
499 ) -> Result<Vec<u8>, RpcClientError> {
500 self.legacy
501 .read()
502 .await
503 .state_call(function, Some(call_parameters), at)
504 .await
505 .map_err(|e| RpcClientError::RequestFailed {
506 method: methods::STATE_CALL,
507 message: e.to_string(),
508 })
509 }
510
511 pub async fn system_properties(
513 &self,
514 ) -> Result<subxt::backend::legacy::rpc_methods::SystemProperties, RpcClientError> {
515 self.legacy.read().await.system_properties().await.map_err(|e| {
516 RpcClientError::RequestFailed {
517 method: methods::SYSTEM_PROPERTIES,
518 message: e.to_string(),
519 }
520 })
521 }
522}
523
524#[cfg(test)]
525mod tests {
526 use super::*;
527
528 #[test]
529 fn error_display_connection_failed() {
530 let err = RpcClientError::ConnectionFailed {
531 endpoint: "wss://example.com".to_string(),
532 message: "connection refused".to_string(),
533 };
534 assert_eq!(err.to_string(), "Failed to connect to wss://example.com: connection refused");
535 }
536
537 #[test]
538 fn error_display_request_failed() {
539 let err = RpcClientError::RequestFailed {
540 method: methods::STATE_GET_STORAGE,
541 message: "connection reset".to_string(),
542 };
543 assert_eq!(
544 err.to_string(),
545 format!("RPC request `{}` failed: connection reset", methods::STATE_GET_STORAGE)
546 );
547 }
548
549 #[test]
550 fn error_display_timeout() {
551 let err = RpcClientError::Timeout { method: methods::STATE_GET_METADATA };
552 assert_eq!(
553 err.to_string(),
554 format!("RPC request `{}` timed out", methods::STATE_GET_METADATA)
555 );
556 }
557
558 #[test]
559 fn error_display_invalid_response() {
560 let err = RpcClientError::InvalidResponse("missing field".to_string());
561 assert_eq!(err.to_string(), "Invalid RPC response: missing field");
562 }
563
564 #[test]
565 fn error_display_storage_not_found() {
566 let err = RpcClientError::StorageNotFound(storage_keys::CODE.to_string());
567 assert_eq!(
568 err.to_string(),
569 format!("Required storage key not found: {}", storage_keys::CODE)
570 );
571 }
572
573 #[tokio::test]
574 async fn connect_to_invalid_endpoint_fails() {
575 let endpoint: Url = "ws://127.0.0.1:19999".parse().unwrap();
577 let result = ForkRpcClient::connect(&endpoint).await;
578
579 assert!(result.is_err());
580 let err = result.unwrap_err();
581 assert!(
582 matches!(err, RpcClientError::ConnectionFailed { .. }),
583 "Expected ConnectionFailed, got: {err:?}"
584 );
585 }
586}