Skip to main content

pop_fork/
rpc.rs

1// SPDX-License-Identifier: GPL-3.0
2
3//! RPC client wrapper for connecting to live Polkadot-SDK chains.
4//!
5//! Provides fork-specific RPC functionality for lazy-loading storage from live chains.
6//!
7//! # Design Decision: Why This Wrapper Exists
8//!
9//! This module wraps subxt's [`LegacyRpcMethods`], which means we are **duplicating**
10//! some of subxt's API surface. We could use `LegacyRpcMethods` directly throughout
11//! the codebase, but we chose to add this layer for the following reasons:
12//!
13//! 1. **Focused API surface**: `LegacyRpcMethods` exposes many methods we don't need. This wrapper
14//!    exposes only what's relevant for fork operations, making the crate easier to understand and
15//!    use.
16//!
17//! 2. **Ergonomic error handling**: subxt's errors are generic. This wrapper provides
18//!    [`RpcClientError`] with fork-specific error variants and messages.
19//!
20//! 3. **Convenience methods**: Methods like [`ForkRpcClient::runtime_code`] encapsulate domain
21//!    knowledge (fetching the `:code` storage key) that would otherwise be scattered across the
22//!    codebase.
23//!
24//! 4. **Insulation from subxt internals**: If subxt changes its API, we only need to update this
25//!    wrapper rather than every call site.
26//!
27//! The tradeoff is maintaining this thin layer, but we believe the ergonomic benefits
28//! justify the small amount of extra code.
29//!
30//! # Why Legacy RPCs?
31//!
32//! We use subxt's `LegacyRpcMethods` (`state_*`, `chain_*`) rather than the newer
33//! `chainHead_v1_*` or `archive_v1_*` specifications because:
34//!
35//! 1. **Universal support**: Legacy RPCs work with all Polkadot SDK nodes. The newer specs may not
36//!    be available on all endpoints.
37//!
38//! 2. **Simplicity**: Legacy RPCs use request/response patterns. The new specs require subscription
39//!    lifecycle management (follow/unfollow, pin/unpin) which adds complexity for our use case of
40//!    querying a specific historical block.
41//!
42//! 3. **Precedent**: Tools like [chopsticks](https://github.com/AcalaNetwork/chopsticks) use legacy
43//!    RPCs for fetching from upstream chains.
44//!
45//! Note: subxt marks legacy methods as "not advised" but they remain widely used.
46//! This decision should be revisited if the ecosystem moves away from legacy RPCs.
47
48use crate::{
49	error::rpc::RpcClientError,
50	strings::rpc::{methods, storage_keys},
51};
52use scale::{Decode, Encode};
53use std::sync::Arc;
54use subxt::{
55	Metadata, SubstrateConfig,
56	backend::{
57		legacy::{LegacyRpcMethods, rpc_methods::Block},
58		rpc::RpcClient,
59	},
60	config::substrate::H256,
61};
62use tokio::sync::{Mutex, RwLock, Semaphore};
63use url::Url;
64
65/// WebSocket connect timeout for upstream RPC clients.
66///
67/// CI runners can be slow to accept WebSocket upgrades under load; a longer
68/// timeout reduces flaky connection failures.
69const WS_CONNECT_TIMEOUT_SECS: u64 = 30;
70
71/// Maximum number of concurrent upstream RPC calls for heavy storage methods.
72///
73/// Limits parallelism for `storage_batch()` and `storage_keys_paged()` to prevent
74/// overwhelming the upstream WebSocket endpoint when many callers (e.g., polkadot.js sending 14
75/// concurrent `state_queryStorageAt` requests) hit the RPC server at once.
76const MAX_CONCURRENT_UPSTREAM_CALLS: usize = 4;
77
78/// Oldest metadata version supported.
79const METADATA_V14: u32 = 14;
80/// Most up-to-date metadata version supported.
81const METADATA_LATEST: u32 = 15;
82
83/// RPC client wrapper for fork operations.
84///
85/// Wraps subxt's [`LegacyRpcMethods`] to provide a focused API for fetching state
86/// from live Polkadot-SDK chains. See the module-level documentation for why this
87/// wrapper exists rather than using `LegacyRpcMethods` directly.
88///
89/// # Example
90///
91/// ```ignore
92/// use pop_fork::ForkRpcClient;
93///
94/// let client = ForkRpcClient::connect(&"wss://rpc.polkadot.io".parse()?).await?;
95/// let block_hash = client.finalized_head().await?;
96/// let metadata = client.metadata(block_hash).await?;
97/// let storage_value = client.storage(&key, block_hash).await?;
98/// ```
99#[derive(Clone)]
100pub struct ForkRpcClient {
101	legacy: Arc<RwLock<LegacyRpcMethods<SubstrateConfig>>>,
102	endpoint: Url,
103	/// Semaphore limiting concurrent upstream calls for heavy storage methods.
104	upstream_semaphore: Arc<Semaphore>,
105	/// Lock that serializes reconnection attempts so only one task reconnects
106	/// at a time. Without this, a dropped upstream connection causes every
107	/// concurrent task to call `reconnect()` simultaneously (thundering herd),
108	/// overwhelming the endpoint with dozens of parallel WebSocket handshakes.
109	reconnect_lock: Arc<Mutex<()>>,
110}
111
112impl std::fmt::Debug for ForkRpcClient {
113	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
114		f.debug_struct("ForkRpcClient").field("endpoint", &self.endpoint).finish()
115	}
116}
117
118impl ForkRpcClient {
119	/// Connect to a live Polkadot-SDK chain.
120	///
121	/// # Arguments
122	/// * `endpoint` - WebSocket URL of the chain's RPC endpoint (e.g., `wss://rpc.polkadot.io`)
123	///
124	/// # Example
125	/// ```ignore
126	/// let client = ForkRpcClient::connect(&"wss://rpc.polkadot.io".parse()?).await?;
127	/// ```
128	pub async fn connect(endpoint: &Url) -> Result<Self, RpcClientError> {
129		let legacy = Self::create_connection(endpoint).await?;
130		Ok(Self {
131			legacy: Arc::new(RwLock::new(legacy)),
132			endpoint: endpoint.clone(),
133			upstream_semaphore: Arc::new(Semaphore::new(MAX_CONCURRENT_UPSTREAM_CALLS)),
134			reconnect_lock: Arc::new(Mutex::new(())),
135		})
136	}
137
138	/// Create a new connection to the endpoint.
139	///
140	/// Builds a jsonrpsee WS client with raised message-size limits so that
141	/// large `state_queryStorageAt` responses (common when batch-fetching
142	/// hundreds of storage keys) don't hit soketto's default 10 MB cap.
143	async fn create_connection(
144		endpoint: &Url,
145	) -> Result<LegacyRpcMethods<SubstrateConfig>, RpcClientError> {
146		use jsonrpsee::ws_client::WsClientBuilder;
147
148		let client = WsClientBuilder::default()
149			.max_response_size(u32::MAX)
150			.connection_timeout(std::time::Duration::from_secs(WS_CONNECT_TIMEOUT_SECS))
151			.build(endpoint.as_str())
152			.await
153			.map_err(|e| RpcClientError::ConnectionFailed {
154				endpoint: endpoint.to_string(),
155				message: e.to_string(),
156			})?;
157		let rpc_client = RpcClient::new(client);
158		Ok(LegacyRpcMethods::new(rpc_client))
159	}
160
161	/// Reconnect to the upstream RPC endpoint.
162	///
163	/// Creates a fresh WebSocket connection, replacing the existing one. All clones
164	/// of this client share the connection, so reconnecting affects all of them.
165	///
166	/// Serialized via `reconnect_lock`: only one task performs the actual reconnection.
167	/// Other concurrent callers wait for the lock, then verify the connection is alive
168	/// before attempting another reconnect (avoiding the thundering herd problem).
169	pub async fn reconnect(&self) -> Result<(), RpcClientError> {
170		let _guard = self.reconnect_lock.lock().await;
171
172		// Another task may have already reconnected while we waited for the lock.
173		// Do a cheap liveness check before creating a new connection.
174		if self.legacy.read().await.system_chain().await.is_ok() {
175			return Ok(());
176		}
177
178		let new_legacy = Self::create_connection(&self.endpoint).await?;
179		*self.legacy.write().await = new_legacy;
180		Ok(())
181	}
182
183	/// Get the endpoint URL this client is connected to.
184	pub fn endpoint(&self) -> &Url {
185		&self.endpoint
186	}
187
188	/// Get the latest finalized block hash.
189	///
190	/// This is typically the starting point for forking - we fork from the latest
191	/// finalized state to ensure consistency.
192	pub async fn finalized_head(&self) -> Result<H256, RpcClientError> {
193		self.legacy.read().await.chain_get_finalized_head().await.map_err(|e| {
194			RpcClientError::RequestFailed {
195				method: methods::CHAIN_GET_FINALIZED_HEAD,
196				message: e.to_string(),
197			}
198		})
199	}
200
201	/// Get block header by hash.
202	///
203	/// Returns the header for the specified block, which contains the parent hash,
204	/// state root, extrinsics root, and digest.
205	pub async fn header(
206		&self,
207		hash: H256,
208	) -> Result<<SubstrateConfig as subxt::Config>::Header, RpcClientError> {
209		self.legacy
210			.read()
211			.await
212			.chain_get_header(Some(hash))
213			.await
214			.map_err(|e| RpcClientError::RequestFailed {
215				method: methods::CHAIN_GET_HEADER,
216				message: e.to_string(),
217			})?
218			.ok_or_else(|| RpcClientError::InvalidResponse(format!("No header found for {hash:?}")))
219	}
220
221	/// Get a block hash by its number.
222	///
223	/// # Arguments
224	/// * `block_number` - The block number to query
225	///
226	/// # Returns
227	/// * `Ok(Some(hash))` - Block exists with this hash
228	/// * `Ok(None)` - Block number doesn't exist yet
229	/// * `Err(_)` - RPC error
230	pub async fn block_hash_at(&self, block_number: u32) -> Result<Option<H256>, RpcClientError> {
231		self.legacy
232			.read()
233			.await
234			.chain_get_block_hash(Some(block_number.into()))
235			.await
236			.map_err(|e| RpcClientError::RequestFailed {
237				method: methods::CHAIN_GET_BLOCK_HASH,
238				message: e.to_string(),
239			})
240	}
241
242	/// Get full block data by block number.
243	///
244	/// This method first fetches the block hash for the given block number using
245	/// `chain_getBlockHash`, then fetches the full block data using `chain_getBlock`.
246	///
247	/// # Arguments
248	/// * `block_number` - The block number to query
249	///
250	/// # Returns
251	/// * `Ok(Some((hash, block)))` - Block exists with hash and data
252	/// * `Ok(None)` - Block number doesn't exist yet
253	/// * `Err(_)` - RPC error
254	pub async fn block_by_number(
255		&self,
256		block_number: u32,
257	) -> Result<Option<(H256, Block<SubstrateConfig>)>, RpcClientError> {
258		// Get block hash from block number
259		let block_hash = self.block_hash_at(block_number).await?;
260
261		let block_hash = match block_hash {
262			Some(hash) => hash,
263			None => return Ok(None),
264		};
265
266		// Get full block data
267		let block =
268			self.legacy.read().await.chain_get_block(Some(block_hash)).await.map_err(|e| {
269				RpcClientError::RequestFailed {
270					method: methods::CHAIN_GET_BLOCK,
271					message: e.to_string(),
272				}
273			})?;
274
275		Ok(block.map(|block| (block_hash, block.block)))
276	}
277
278	/// Get full block data by block hash.
279	///
280	/// # Arguments
281	/// * `block_hash` - The block hash to query
282	///
283	/// # Returns
284	/// * `Ok(Some(block))` - Block exists
285	/// * `Ok(None)` - Block hash not found
286	/// * `Err(_)` - RPC error
287	pub async fn block_by_hash(
288		&self,
289		block_hash: H256,
290	) -> Result<Option<Block<SubstrateConfig>>, RpcClientError> {
291		let block =
292			self.legacy.read().await.chain_get_block(Some(block_hash)).await.map_err(|e| {
293				RpcClientError::RequestFailed {
294					method: methods::CHAIN_GET_BLOCK,
295					message: e.to_string(),
296				}
297			})?;
298
299		Ok(block.map(|b| b.block))
300	}
301
302	/// Get a single storage value at a specific block.
303	///
304	/// # Arguments
305	/// * `key` - The storage key (raw bytes)
306	/// * `at` - The block hash to query state at
307	///
308	/// # Returns
309	/// * `Ok(Some(value))` - Storage exists with value
310	/// * `Ok(None)` - Storage key doesn't exist (empty)
311	/// * `Err(_)` - RPC error
312	pub async fn storage(&self, key: &[u8], at: H256) -> Result<Option<Vec<u8>>, RpcClientError> {
313		self.legacy.read().await.state_get_storage(key, Some(at)).await.map_err(|e| {
314			RpcClientError::RequestFailed {
315				method: methods::STATE_GET_STORAGE,
316				message: e.to_string(),
317			}
318		})
319	}
320
321	/// Get multiple storage values in a single batch request.
322	///
323	/// More efficient than multiple individual `storage()` calls when fetching
324	/// many keys at once.
325	///
326	/// # Arguments
327	/// * `keys` - Slice of storage keys to fetch
328	/// * `at` - The block hash to query state at
329	///
330	/// # Returns
331	/// A vector of optional values, in the same order as the input keys.
332	pub async fn storage_batch(
333		&self,
334		keys: &[&[u8]],
335		at: H256,
336	) -> Result<Vec<Option<Vec<u8>>>, RpcClientError> {
337		if keys.is_empty() {
338			return Ok(vec![]);
339		}
340
341		let _permit = self.upstream_semaphore.acquire().await.expect("semaphore closed");
342
343		let result = self
344			.legacy
345			.read()
346			.await
347			.state_query_storage_at(keys.iter().copied(), Some(at))
348			.await
349			.map_err(|e| RpcClientError::RequestFailed {
350				method: methods::STATE_QUERY_STORAGE_AT,
351				message: e.to_string(),
352			})?;
353
354		// Build a map of key -> value from the response
355		let changes: std::collections::HashMap<Vec<u8>, Option<Vec<u8>>> = result
356			.into_iter()
357			.flat_map(|change_set| {
358				change_set.changes.into_iter().map(|(k, v)| {
359					let key_bytes = k.0.to_vec();
360					let value_bytes = v.map(|v| v.0.to_vec());
361					(key_bytes, value_bytes)
362				})
363			})
364			.collect();
365
366		// Return values in the same order as input keys, preserving duplicates.
367		let values = keys.iter().map(|key| changes.get::<[u8]>(key).cloned().flatten()).collect();
368
369		Ok(values)
370	}
371
372	/// Get storage keys matching a prefix, with pagination.
373	///
374	/// Useful for iterating over map storage items.
375	///
376	/// # Arguments
377	/// * `prefix` - The storage key prefix to match
378	/// * `count` - Maximum number of keys to return
379	/// * `start_key` - Optional key to start from (for pagination)
380	/// * `at` - The block hash to query state at
381	pub async fn storage_keys_paged(
382		&self,
383		prefix: &[u8],
384		count: u32,
385		start_key: Option<&[u8]>,
386		at: H256,
387	) -> Result<Vec<Vec<u8>>, RpcClientError> {
388		let _permit = self.upstream_semaphore.acquire().await.expect("semaphore closed");
389
390		self.legacy
391			.read()
392			.await
393			.state_get_keys_paged(prefix, count, start_key, Some(at))
394			.await
395			.map_err(|e| RpcClientError::RequestFailed {
396				method: methods::STATE_GET_KEYS_PAGED,
397				message: e.to_string(),
398			})
399	}
400
401	/// Get runtime metadata at a specific block.
402	///
403	/// Attempts to fetch and decode metadata via `state_getMetadata`. If decoding
404	/// fails (e.g., due to type registry inconsistencies in the chain's metadata),
405	/// falls back to requesting specific metadata versions via
406	/// `Metadata_metadata_at_version` runtime API (latest down to V14).
407	pub async fn metadata(&self, at: H256) -> Result<Metadata, RpcClientError> {
408		let raw = self.legacy.read().await.state_get_metadata(Some(at)).await.map_err(|e| {
409			RpcClientError::RequestFailed {
410				method: methods::STATE_GET_METADATA,
411				message: e.to_string(),
412			}
413		})?;
414
415		let raw_bytes = raw.into_raw();
416		match Metadata::decode(&mut raw_bytes.as_slice()) {
417			Ok(metadata) => Ok(metadata),
418			Err(default_err) => {
419				// Try explicit version requests as fallback.
420				for version in (METADATA_V14..=METADATA_LATEST).rev() {
421					if let Some(bytes) = self.metadata_at_version(version, at).await? &&
422						let Ok(metadata) = Metadata::decode(&mut bytes.as_slice())
423					{
424						return Ok(metadata);
425					}
426				}
427				Err(RpcClientError::MetadataDecodingFailed(default_err.to_string()))
428			},
429		}
430	}
431
432	/// Request metadata at a specific version via the `Metadata_metadata_at_version`
433	/// runtime API.
434	///
435	/// Returns `Ok(Some(bytes))` if the chain supports the requested version,
436	/// `Ok(None)` if it does not, or an error if the RPC call itself fails.
437	async fn metadata_at_version(
438		&self,
439		version: u32,
440		at: H256,
441	) -> Result<Option<Vec<u8>>, RpcClientError> {
442		let result = self
443			.legacy
444			.read()
445			.await
446			.state_call("Metadata_metadata_at_version", Some(&version.encode()), Some(at))
447			.await
448			.map_err(|e| RpcClientError::RequestFailed {
449				method: methods::STATE_CALL,
450				message: e.to_string(),
451			})?;
452
453		// The runtime returns SCALE-encoded `Option<OpaqueMetadata>` where
454		// `OpaqueMetadata` is `Vec<u8>`.
455		let opaque: Option<Vec<u8>> = Decode::decode(&mut result.as_slice()).map_err(|e| {
456			RpcClientError::InvalidResponse(format!(
457				"Failed to decode metadata_at_version response: {e}"
458			))
459		})?;
460
461		Ok(opaque)
462	}
463
464	/// Get the runtime WASM code at a specific block.
465	///
466	/// This fetches the `:code` storage key which contains the runtime WASM blob.
467	pub async fn runtime_code(&self, at: H256) -> Result<Vec<u8>, RpcClientError> {
468		// :code storage key.
469		let code_key = sp_core::storage::well_known_keys::CODE;
470
471		self.storage(code_key, at)
472			.await?
473			.ok_or_else(|| RpcClientError::StorageNotFound(storage_keys::CODE.to_string()))
474	}
475
476	/// Get the chain name from system properties.
477	pub async fn system_chain(&self) -> Result<String, RpcClientError> {
478		self.legacy
479			.read()
480			.await
481			.system_chain()
482			.await
483			.map_err(|e| RpcClientError::RequestFailed {
484				method: methods::SYSTEM_CHAIN,
485				message: e.to_string(),
486			})
487	}
488
489	/// Execute a runtime API call via `state_call` on the upstream chain.
490	///
491	/// This is useful for proxying computationally expensive runtime calls (like metadata
492	/// generation) to the upstream node, which has a JIT-compiled runtime and handles them
493	/// much faster than the local WASM interpreter.
494	pub async fn state_call(
495		&self,
496		function: &str,
497		call_parameters: &[u8],
498		at: Option<H256>,
499	) -> Result<Vec<u8>, RpcClientError> {
500		self.legacy
501			.read()
502			.await
503			.state_call(function, Some(call_parameters), at)
504			.await
505			.map_err(|e| RpcClientError::RequestFailed {
506				method: methods::STATE_CALL,
507				message: e.to_string(),
508			})
509	}
510
511	/// Get system properties (token decimals, symbols, etc.).
512	pub async fn system_properties(
513		&self,
514	) -> Result<subxt::backend::legacy::rpc_methods::SystemProperties, RpcClientError> {
515		self.legacy.read().await.system_properties().await.map_err(|e| {
516			RpcClientError::RequestFailed {
517				method: methods::SYSTEM_PROPERTIES,
518				message: e.to_string(),
519			}
520		})
521	}
522}
523
524#[cfg(test)]
525mod tests {
526	use super::*;
527
528	#[test]
529	fn error_display_connection_failed() {
530		let err = RpcClientError::ConnectionFailed {
531			endpoint: "wss://example.com".to_string(),
532			message: "connection refused".to_string(),
533		};
534		assert_eq!(err.to_string(), "Failed to connect to wss://example.com: connection refused");
535	}
536
537	#[test]
538	fn error_display_request_failed() {
539		let err = RpcClientError::RequestFailed {
540			method: methods::STATE_GET_STORAGE,
541			message: "connection reset".to_string(),
542		};
543		assert_eq!(
544			err.to_string(),
545			format!("RPC request `{}` failed: connection reset", methods::STATE_GET_STORAGE)
546		);
547	}
548
549	#[test]
550	fn error_display_timeout() {
551		let err = RpcClientError::Timeout { method: methods::STATE_GET_METADATA };
552		assert_eq!(
553			err.to_string(),
554			format!("RPC request `{}` timed out", methods::STATE_GET_METADATA)
555		);
556	}
557
558	#[test]
559	fn error_display_invalid_response() {
560		let err = RpcClientError::InvalidResponse("missing field".to_string());
561		assert_eq!(err.to_string(), "Invalid RPC response: missing field");
562	}
563
564	#[test]
565	fn error_display_storage_not_found() {
566		let err = RpcClientError::StorageNotFound(storage_keys::CODE.to_string());
567		assert_eq!(
568			err.to_string(),
569			format!("Required storage key not found: {}", storage_keys::CODE)
570		);
571	}
572
573	#[tokio::test]
574	async fn connect_to_invalid_endpoint_fails() {
575		// Use a port that's unlikely to have anything listening
576		let endpoint: Url = "ws://127.0.0.1:19999".parse().unwrap();
577		let result = ForkRpcClient::connect(&endpoint).await;
578
579		assert!(result.is_err());
580		let err = result.unwrap_err();
581		assert!(
582			matches!(err, RpcClientError::ConnectionFailed { .. }),
583			"Expected ConnectionFailed, got: {err:?}"
584		);
585	}
586}