1use crate::{default_true, empty_string_array, ChainName, RegistryUrl};
2use crate::{endpoint::Endpoint, EndpointUrl};
3use anyhow::Result;
4use chrono::{DateTime, Local};
5use jsonrpsee::{core::client::ClientT, http_client::HttpClientBuilder, rpc_params, ws_client::WsClientBuilder};
6use log::*;
7use serde::{Deserialize, Serialize};
8use std::collections::HashMap;
9use std::fmt::Display;
10use std::time::Instant;
11use std::{
12 fs::File,
13 io::{Read, Write},
14 path::PathBuf,
15};
16use tokio::runtime::Runtime;
17
18#[derive(Eq, Debug, Deserialize, Serialize)]
19pub struct Registry {
20 #[serde(default = "default_true")]
22 pub enabled: bool,
23
24 pub name: String,
26
27 pub url: Option<RegistryUrl>,
29
30 #[serde(default = "empty_string_array")]
32 pub labels: Vec<String>,
33
34 pub last_update: Option<DateTime<Local>>,
36
37 pub rpc_endpoints: HashMap<ChainName, Vec<Endpoint>>,
39}
40
41impl PartialEq for Registry {
42 fn eq(&self, other: &Self) -> bool {
43 self.name == other.name
44 }
45}
46
47impl Registry {
48 pub fn new(name: &str, url: &str) -> Self {
49 Self {
50 name: name.to_string(),
51 url: Some(url.to_string()),
52 rpc_endpoints: HashMap::new(),
53 enabled: true,
54 last_update: None,
55 labels: vec![],
56 }
57 }
58
59 pub fn update(&mut self) -> Result<()> {
61 if !self.enabled {
62 warn!("Registry is disabled, skipping...");
63 return Ok(());
64 }
65
66 if self.url.is_none() {
67 warn!("Registry '{}' has no URL, skipping...", self.name);
68 return Ok(());
69 }
70
71 if let Some(registry_url) = &self.url {
79 let reg = reqwest::blocking::get(registry_url)?.json::<Registry>()?;
80
81 self.rpc_endpoints = reg.rpc_endpoints;
82 debug!("Found {:?} items", self.rpc_endpoints.len());
83 } else {
84 log::warn!("No URL, skipping...");
85 }
86
87 Ok(())
88 }
89
90 pub fn refresh_stats(&mut self) {
92 self.rpc_endpoints.iter_mut().for_each(|(_name, endpoints)| {
93 endpoints.iter_mut().for_each(|endpoint| {
94 let (success, latency) = Self::ping(endpoint).unwrap_or((false, None));
95 let stats = &mut endpoint.stats;
96 stats.add(success, latency)
97 })
98 })
99 }
100
101 pub fn ping_all(&mut self) {
105 self.rpc_endpoints.iter_mut().for_each(|(_name, endpoints)| {
106 endpoints.iter_mut().for_each(|endpoint| match Self::ping(endpoint) {
107 Ok((success, latency)) => {
108 if success {
109 print!("✅ {:0.3}s", latency.unwrap_or(0f32));
110 } else {
111 print!("{: <8}", "❌");
112 }
113 println!(" - {:<20} {}", endpoint.name, endpoint.url);
114 }
115 Err(e) => {
116 eprint!("{: <8}", "❌");
117 eprintln!("{}: {e}", endpoint.url);
118 }
119 })
120 })
121 }
122
123 pub fn ping(e: &Endpoint) -> Result<(bool, Option<f32>)> {
124 debug!("pinging endpoint {} at {}", e.name, e.url);
125 let rt = Runtime::new().unwrap();
126 let start = Instant::now();
127
128 let response: Result<String> = match &e.url {
129 EndpointUrl::Https(url) | EndpointUrl::Http(url) => {
130 debug!("Detected HTTP/S");
131 let client = HttpClientBuilder::default().build(url)?;
132 rt.block_on(client.request("system_chain", rpc_params![])).map_err(anyhow::Error::msg)
133 }
134 EndpointUrl::Wss(url) | EndpointUrl::Ws(url) => {
135 debug!("Detected WS/S");
136 let client = rt.block_on(WsClientBuilder::default().build(url))?;
137 rt.block_on(client.request("system_chain", rpc_params![])).map_err(anyhow::Error::msg)
138 }
139 };
140 debug!("response = {:?}", response);
141 let duration = start.elapsed().as_millis() as f32 / 1000f32;
142 let success = response.is_ok();
143 rt.shutdown_background();
144 Ok((success, Some(duration)))
145 }
146
147 pub fn save(&self, file: PathBuf) -> Result<()> {
148 let json = serde_json::to_string_pretty(self)?;
149 let mut fs = File::create(file)?;
150 fs.write_all(json.as_bytes())?;
151 Ok(())
152 }
153
154 pub fn load(file: PathBuf) -> Self {
155 let mut fs = File::open(file).expect("File should be valid");
156 let mut s = String::new();
157 fs.read_to_string(&mut s).expect("Fail reading registry");
158 serde_json::from_str(&s).expect("Format should be valid")
159 }
160
161 pub fn load_from_url(url: &str) -> Result<Self> {
162 debug!("Adding registry from {url}");
163 reqwest::blocking::get(url)?.json::<Registry>().map_err(anyhow::Error::msg)
164 }
165
166 pub fn default_bad() -> Self {
167 let rpc_endpoints = HashMap::from([
168 (
169 "Polkadot".to_string(),
170 vec![
171 Endpoint::new("Parity", "wss://rpc.polkadot.io:443", vec!["Parity".to_string()], vec![]),
172 Endpoint::new(
173 "OnFinality",
174 "wss://polkadot.api.onfinality.io:443/public-ws",
175 vec!["OnFinality".to_string()],
176 vec![],
177 ),
178 ],
179 ),
180 (
181 "Kusama".to_string(),
182 vec![
183 Endpoint::new("Parity", "wss://kusama-rpc.polkadot.io:443", vec!["Parity".to_string()], vec![]),
184 Endpoint::new(
185 "Parity Bad",
186 "wss://bad-rpc.polkadot.io:443",
187 vec!["Parity".to_string(), "Bad".to_string()],
188 vec![],
189 ),
190 ],
191 ),
192 ]);
193
194 Self { rpc_endpoints, ..Default::default() }
195 }
196}
197
198impl Default for Registry {
199 fn default() -> Self {
200 let rpc_endpoints = HashMap::from([
201 (
202 "Polkadot".to_string(),
203 vec![
204 Endpoint::new("Parity", "wss://rpc.polkadot.io:443", vec!["Parity".to_string()], vec![]),
205 Endpoint::new(
206 "OnFinality",
207 "wss://polkadot.api.onfinality.io:443/public-ws",
208 vec!["OnFinality".to_string()],
209 vec![],
210 ),
211 ],
212 ),
213 (
214 "Kusama".to_string(),
215 vec![Endpoint::new("Parity", "wss://kusama-rpc.polkadot.io:443", vec!["Parity".to_string()], vec![])],
216 ),
217 ]);
218
219 Self {
220 name: "SubRPC Default".to_string(),
221 url: None,
222 rpc_endpoints,
223 enabled: true,
224 last_update: None,
225 labels: vec![],
226 }
227 }
228}
229
230impl Display for Registry {
231 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
232 let _ = f.write_fmt(format_args!(
233 "Registry: {} (url: {})\n",
234 &self.name,
235 &self.url.clone().unwrap_or("n/a".to_string())
236 ));
237
238 self.rpc_endpoints.iter().for_each(|(name, endpoints)| {
239 let _ = f.write_fmt(format_args!(" - {name}\n"));
240 endpoints.iter().for_each(|e| {
241 let _ = f.write_fmt(format_args!(
242 " - {}: {:?}\n",
243 e.name,
244 e.stats
246 ));
247 })
248 });
249 Ok(())
250 }
251}
252
253#[cfg(test)]
254mod test_super {
255 use std::{env, path::Path};
256
257 use super::*;
258
259 #[test]
260 fn test_default() {
261 let reg1 = Registry::default();
262 let json = ::serde_json::to_string_pretty(®1).unwrap();
263 println!("json= {json}");
264 }
265
266 #[test]
267 fn test_refresh_stats() {
268 let mut reg1 = Registry::default();
269 reg1.refresh_stats();
270 println!("{}", ®1);
271 reg1.refresh_stats();
272 println!("{}", ®1);
273 }
274
275 #[test]
276 fn test_ping_all() {
277 let mut reg1 = Registry::default();
278 reg1.ping_all();
279 }
280
281 #[test]
282 fn test_ping_each() {
283 let reg1 = Registry::default();
284 reg1.rpc_endpoints.iter().for_each(|(_chain, endpoints)| {
285 endpoints.iter().for_each(|e| {
286 println!("Checking {}: {:?}", e.name, e.url);
287 let (success, duration) = Registry::ping(e).unwrap();
288 println!("{} => {:?} {:?}", e.name, success, duration);
289 assert!(success);
290 });
291 });
292 }
293
294 #[test]
295 fn test_save() {
296 let reg1 = Registry::default();
297 let tmpdir = env::temp_dir();
298 let target_file = Path::new(&tmpdir).join("subrpc.json");
299 println!("Saving to {target_file:?}");
300 assert!(reg1.save(target_file).is_ok());
301 }
302
303 #[test]
304 fn test_save_load() {
305 let reg1 = Registry::default();
306 let tmpdir = env::temp_dir();
307 let target_file = Path::new(&tmpdir).join("subrpc.json");
308 assert!(reg1.save(target_file.clone()).is_ok());
309 let reg2 = Registry::load(target_file);
310 assert_eq!(reg2, reg1);
311 }
312
313 #[test]
314 fn test_load_from_url() {
315 let test_url = "https://raw.githubusercontent.com/chevdor/subrpc/master/registry/sample1.json";
316 let reg = Registry::load_from_url(test_url).unwrap();
317 println!("{reg:#?}");
318 assert_eq!("SubRPC Test Registry 1", reg.name);
319 }
320}