subrpc_core/
registry.rs

1use crate::{default_true, empty_string_array, ChainName, RegistryUrl};
2use crate::{endpoint::Endpoint, EndpointUrl};
3use anyhow::Result;
4use chrono::{DateTime, Local};
5use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params, ws_client::WsClientBuilder};
6use log::*;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt::Display;
10use std::time::Instant;
11use std::{
12	fs::File,
13	io::{Read, Write},
14	path::PathBuf,
15};
16use tokio::runtime::Runtime;
17
18#[derive(Eq, Debug, Deserialize, Serialize)]
19pub struct Registry {
20	/// Data won't be pulled from a disabled registry
21	#[serde(default = "default_true")]
22	pub enabled: bool,
23
24	/// Name of the registry
25	pub name: String,
26
27	/// URL of the registry, there may be none for local/default registries
28	pub url: Option<RegistryUrl>,
29
30	/// Optional labels
31	#[serde(default = "empty_string_array")]
32	pub labels: Vec<String>,
33
34	/// DateTime of the last update of the data
35	pub last_update: Option<DateTime<Local>>,
36
37	/// Items of the registry
38	pub rpc_endpoints: HashMap<ChainName, Vec<Endpoint>>,
39}
40
41impl PartialEq for Registry {
42	fn eq(&self, other: &Self) -> bool {
43		self.name == other.name
44	}
45}
46
47impl Registry {
48	pub fn new(name: &str, url: &str) -> Self {
49		Self {
50			name: name.to_string(),
51			url: Some(url.to_string()),
52			rpc_endpoints: HashMap::new(),
53			enabled: true,
54			last_update: None,
55			labels: vec![],
56		}
57	}
58
59	/// Fetch the information from located at the registry's url and update the registry items
60	pub fn update(&mut self) -> Result<()> {
61		if !self.enabled {
62			warn!("Registry is disabled, skipping...");
63			return Ok(());
64		}
65
66		if self.url.is_none() {
67			warn!("Registry '{}' has no URL, skipping...", self.name);
68			return Ok(());
69		}
70
71		// reg.items.iter().for_each(|(name, endpoints)| {
72		//     debug!("   - {}", name);
73		//     endpoints.iter().for_each(|e| {
74		//         debug!("     - {} {}", e.name, e.url);
75		//     });
76		// });
77
78		if let Some(registry_url) = &self.url {
79			let reg = reqwest::blocking::get(registry_url)?.json::<Registry>()?;
80
81			self.rpc_endpoints = reg.rpc_endpoints;
82			debug!("Found {:?} items", self.rpc_endpoints.len());
83		} else {
84			log::warn!("No URL, skipping...");
85		}
86
87		Ok(())
88	}
89
90	/// Ping all endpoints and refresh the stats
91	pub fn refresh_stats(&mut self) {
92		self.rpc_endpoints.iter_mut().for_each(|(_name, endpoints)| {
93			endpoints.iter_mut().for_each(|endpoint| {
94				let (success, latency) = Self::ping(endpoint).unwrap_or((false, None));
95				let stats = &mut endpoint.stats;
96				stats.add(success, latency)
97			})
98		})
99	}
100
101	/// Ping all endpoints and print the results to stdout.
102	///
103	/// Calling this function does NOT refresh the stats.
104	pub fn ping_all(&mut self) {
105		self.rpc_endpoints.iter_mut().for_each(|(_name, endpoints)| {
106			endpoints.iter_mut().for_each(|endpoint| match Self::ping(endpoint) {
107				Ok((success, latency)) => {
108					if success {
109						print!("✅ {:0.3}s", latency.unwrap_or(0f32));
110					} else {
111						print!("{: <8}", "❌");
112					}
113					println!(" - {:<20} {}", endpoint.name, endpoint.url);
114				}
115				Err(e) => {
116					eprint!("{: <8}", "❌");
117					eprintln!("{}: {e}", endpoint.url);
118				}
119			})
120		})
121	}
122
123	pub fn ping(e: &Endpoint) -> Result<(bool, Option<f32>)> {
124		debug!("pinging endpoint {} at {}", e.name, e.url);
125		let rt = Runtime::new().unwrap();
126		let start = Instant::now();
127
128		let response: Result<String> = match &e.url {
129			EndpointUrl::Https(url) | EndpointUrl::Http(url) => {
130				debug!("Detected HTTP/S");
131				let client = HttpClientBuilder::default().build(url)?;
132				rt.block_on(client.request("system_chain", rpc_params![])).map_err(anyhow::Error::msg)
133			}
134			EndpointUrl::Wss(url) | EndpointUrl::Ws(url) => {
135				debug!("Detected WS/S");
136				let client = rt.block_on(WsClientBuilder::default().build(url))?;
137				rt.block_on(client.request("system_chain", rpc_params![])).map_err(anyhow::Error::msg)
138			}
139		};
140		debug!("response = {:?}", response);
141		let duration = start.elapsed().as_millis() as f32 / 1000f32;
142		let success = response.is_ok();
143		rt.shutdown_background();
144		Ok((success, Some(duration)))
145	}
146
147	pub fn save(&self, file: PathBuf) -> Result<()> {
148		let json = serde_json::to_string_pretty(self)?;
149		let mut fs = File::create(file)?;
150		fs.write_all(json.as_bytes())?;
151		Ok(())
152	}
153
154	pub fn load(file: PathBuf) -> Self {
155		let mut fs = File::open(file).expect("File should be valid");
156		let mut s = String::new();
157		fs.read_to_string(&mut s).expect("Fail reading registry");
158		serde_json::from_str(&s).expect("Format should be valid")
159	}
160
161	pub fn load_from_url(url: &str) -> Result<Self> {
162		debug!("Adding registry from {url}");
163		reqwest::blocking::get(url)?.json::<Registry>().map_err(anyhow::Error::msg)
164	}
165
166	pub fn default_bad() -> Self {
167		let rpc_endpoints = HashMap::from([
168			(
169				"Polkadot".to_string(),
170				vec![
171					Endpoint::new("Parity", "wss://rpc.polkadot.io:443", vec!["Parity".to_string()], vec![]),
172					Endpoint::new(
173						"OnFinality",
174						"wss://polkadot.api.onfinality.io:443/public-ws",
175						vec!["OnFinality".to_string()],
176						vec![],
177					),
178				],
179			),
180			(
181				"Kusama".to_string(),
182				vec![
183					Endpoint::new("Parity", "wss://kusama-rpc.polkadot.io:443", vec!["Parity".to_string()], vec![]),
184					Endpoint::new(
185						"Parity Bad",
186						"wss://bad-rpc.polkadot.io:443",
187						vec!["Parity".to_string(), "Bad".to_string()],
188						vec![],
189					),
190				],
191			),
192		]);
193
194		Self { rpc_endpoints, ..Default::default() }
195	}
196}
197
198impl Default for Registry {
199	fn default() -> Self {
200		let rpc_endpoints = HashMap::from([
201			(
202				"Polkadot".to_string(),
203				vec![
204					Endpoint::new("Parity", "wss://rpc.polkadot.io:443", vec!["Parity".to_string()], vec![]),
205					Endpoint::new(
206						"OnFinality",
207						"wss://polkadot.api.onfinality.io:443/public-ws",
208						vec!["OnFinality".to_string()],
209						vec![],
210					),
211				],
212			),
213			(
214				"Kusama".to_string(),
215				vec![Endpoint::new("Parity", "wss://kusama-rpc.polkadot.io:443", vec!["Parity".to_string()], vec![])],
216			),
217		]);
218
219		Self {
220			name: "SubRPC Default".to_string(),
221			url: None,
222			rpc_endpoints,
223			enabled: true,
224			last_update: None,
225			labels: vec![],
226		}
227	}
228}
229
230impl Display for Registry {
231	fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232		let _ = f.write_fmt(format_args!(
233			"Registry: {} (url: {})\n",
234			&self.name,
235			&self.url.clone().unwrap_or("n/a".to_string())
236		));
237
238		self.rpc_endpoints.iter().for_each(|(name, endpoints)| {
239			let _ = f.write_fmt(format_args!("  - {name}\n"));
240			endpoints.iter().for_each(|e| {
241				let _ = f.write_fmt(format_args!(
242					"    - {}: {:?}\n",
243					e.name,
244					// e.url,
245					e.stats
246				));
247			})
248		});
249		Ok(())
250	}
251}
252
253#[cfg(test)]
254mod test_super {
255	use std::{env, path::Path};
256
257	use super::*;
258
259	#[test]
260	fn test_default() {
261		let reg1 = Registry::default();
262		let json = ::serde_json::to_string_pretty(&reg1).unwrap();
263		println!("json= {json}");
264	}
265
266	#[test]
267	fn test_refresh_stats() {
268		let mut reg1 = Registry::default();
269		reg1.refresh_stats();
270		println!("{}", &reg1);
271		reg1.refresh_stats();
272		println!("{}", &reg1);
273	}
274
275	#[test]
276	fn test_ping_all() {
277		let mut reg1 = Registry::default();
278		reg1.ping_all();
279	}
280
281	#[test]
282	fn test_ping_each() {
283		let reg1 = Registry::default();
284		reg1.rpc_endpoints.iter().for_each(|(_chain, endpoints)| {
285			endpoints.iter().for_each(|e| {
286				println!("Checking {}: {:?}", e.name, e.url);
287				let (success, duration) = Registry::ping(e).unwrap();
288				println!("{} => {:?} {:?}", e.name, success, duration);
289				assert!(success);
290			});
291		});
292	}
293
294	#[test]
295	fn test_save() {
296		let reg1 = Registry::default();
297		let tmpdir = env::temp_dir();
298		let target_file = Path::new(&tmpdir).join("subrpc.json");
299		println!("Saving to {target_file:?}");
300		assert!(reg1.save(target_file).is_ok());
301	}
302
303	#[test]
304	fn test_save_load() {
305		let reg1 = Registry::default();
306		let tmpdir = env::temp_dir();
307		let target_file = Path::new(&tmpdir).join("subrpc.json");
308		assert!(reg1.save(target_file.clone()).is_ok());
309		let reg2 = Registry::load(target_file);
310		assert_eq!(reg2, reg1);
311	}
312
313	#[test]
314	fn test_load_from_url() {
315		let test_url = "https://raw.githubusercontent.com/chevdor/subrpc/master/registry/sample1.json";
316		let reg = Registry::load_from_url(test_url).unwrap();
317		println!("{reg:#?}");
318		assert_eq!("SubRPC Test Registry 1", reg.name);
319	}
320}