1use std::{borrow::Borrow, sync::RwLock};
4
5use log::{info, warn};
6
7use crate::api::ElectrumApi;
8use crate::batch::Batch;
9use crate::config::Config;
10use crate::raw_client::*;
11use crate::types::*;
12use bp::{ScriptPubkey, Txid};
13use std::convert::TryFrom;
14
15pub enum ClientType {
21 #[allow(missing_docs)]
22 TCP(RawClient<ElectrumPlaintextStream>),
23 #[allow(missing_docs)]
24 SSL(RawClient<ElectrumSslStream>),
25 #[allow(missing_docs)]
26 Socks5(RawClient<ElectrumProxyStream>),
27}
28
29pub struct Client {
32 client_type: RwLock<ClientType>,
33 config: Config,
34 url: String,
35}
36
37macro_rules! impl_inner_call {
38 ( $self:expr, $name:ident $(, $args:expr)* ) => {
39 {
40 let mut errors = vec![];
41 loop {
42 let read_client = $self.client_type.read().unwrap();
43 let res = match &*read_client {
44 ClientType::TCP(inner) => inner.$name( $($args, )* ),
45 ClientType::SSL(inner) => inner.$name( $($args, )* ),
46 ClientType::Socks5(inner) => inner.$name( $($args, )* ),
47 };
48 drop(read_client);
49 match res {
50 Ok(val) => return Ok(val),
51 Err(Error::Protocol(_) | Error::AlreadySubscribed(_)) => {
52 return res;
53 },
54 Err(e) => {
55 let failed_attempts = errors.len() + 1;
56
57 if retries_exhausted(failed_attempts, $self.config.retry()) {
58 warn!("call '{}' failed after {} attempts", stringify!($name), failed_attempts);
59 return Err(Error::AllAttemptsErrored(errors));
60 }
61
62 warn!("call '{}' failed with {}, retry: {}/{}", stringify!($name), e, failed_attempts, $self.config.retry());
63
64 errors.push(e);
65
66 if let Ok(mut write_client) = $self.client_type.try_write() {
70 loop {
71 std::thread::sleep(std::time::Duration::from_secs((1 << errors.len()).min(30) as u64));
72 match ClientType::from_config(&$self.url, &$self.config) {
73 Ok(new_client) => {
74 info!("Successfully created new client");
75 *write_client = new_client;
76 break;
77 },
78 Err(e) => {
79 let failed_attempts = errors.len() + 1;
80
81 if retries_exhausted(failed_attempts, $self.config.retry()) {
82 warn!("re-creating client failed after {} attempts", failed_attempts);
83 return Err(Error::AllAttemptsErrored(errors));
84 }
85
86 warn!("re-creating client failed with {}, retry: {}/{}", e, failed_attempts, $self.config.retry());
87
88 errors.push(e);
89 }
90 }
91 }
92 }
93 },
94 }
95 }}
96 }
97}
98
99fn retries_exhausted(failed_attempts: usize, configured_retries: u8) -> bool {
100 match u8::try_from(failed_attempts) {
101 Ok(failed_attempts) => failed_attempts > configured_retries,
102 Err(_) => true, }
104}
105
106impl ClientType {
107 pub fn from_config(url: &str, config: &Config) -> Result<Self, Error> {
110 if url.starts_with("ssl://") {
111 let url = url.replacen("ssl://", "", 1);
112 let client = match config.socks5() {
113 Some(socks5) => RawClient::new_proxy_ssl(
114 url.as_str(),
115 config.validate_domain(),
116 socks5,
117 config.timeout(),
118 )?,
119 None => {
120 RawClient::new_ssl(url.as_str(), config.validate_domain(), config.timeout())?
121 }
122 };
123
124 Ok(ClientType::SSL(client))
125 } else {
126 let url = url.replacen("tcp://", "", 1);
127
128 Ok(match config.socks5().as_ref() {
129 None => ClientType::TCP(RawClient::new(url.as_str(), config.timeout())?),
130 Some(socks5) => ClientType::Socks5(RawClient::new_proxy(
131 url.as_str(),
132 socks5,
133 config.timeout(),
134 )?),
135 })
136 }
137 }
138}
139
140impl Client {
141 pub fn new(url: &str) -> Result<Self, Error> {
151 Self::from_config(url, Config::default())
152 }
153
154 pub fn from_config(url: &str, config: Config) -> Result<Self, Error> {
157 let client_type = RwLock::new(ClientType::from_config(url, &config)?);
158
159 Ok(Client {
160 client_type,
161 config,
162 url: url.to_string(),
163 })
164 }
165}
166
167impl ElectrumApi for Client {
168 #[inline]
169 fn raw_call(
170 &self,
171 method_name: &str,
172 params: impl IntoIterator<Item = Param>,
173 ) -> Result<serde_json::Value, Error> {
174 let vec = params.into_iter().collect::<Vec<Param>>();
180 impl_inner_call!(self, internal_raw_call_with_vec, method_name, vec.clone());
181 }
182
183 #[inline]
184 fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
185 impl_inner_call!(self, batch_call, batch)
186 }
187
188 #[inline]
189 fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
190 impl_inner_call!(self, block_headers_subscribe_raw)
191 }
192
193 #[inline]
194 fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
195 impl_inner_call!(self, block_headers_pop_raw)
196 }
197
198 #[inline]
199 fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
200 impl_inner_call!(self, block_header_raw, height)
201 }
202
203 #[inline]
204 fn block_headers(&self, start_height: usize, count: usize) -> Result<GetHeadersRes, Error> {
205 impl_inner_call!(self, block_headers, start_height, count)
206 }
207
208 #[inline]
209 fn estimate_fee(&self, number: usize) -> Result<f64, Error> {
210 impl_inner_call!(self, estimate_fee, number)
211 }
212
213 #[inline]
214 fn relay_fee(&self) -> Result<f64, Error> {
215 impl_inner_call!(self, relay_fee)
216 }
217
218 #[inline]
219 fn script_subscribe(&self, script: &ScriptPubkey) -> Result<Option<ScriptStatus>, Error> {
220 impl_inner_call!(self, script_subscribe, script)
221 }
222
223 #[inline]
224 fn batch_script_subscribe<'s, I>(&self, scripts: I) -> Result<Vec<Option<ScriptStatus>>, Error>
225 where
226 I: IntoIterator + Clone,
227 I::Item: Borrow<&'s ScriptPubkey>,
228 {
229 impl_inner_call!(self, batch_script_subscribe, scripts.clone())
230 }
231
232 #[inline]
233 fn script_unsubscribe(&self, script: &ScriptPubkey) -> Result<bool, Error> {
234 impl_inner_call!(self, script_unsubscribe, script)
235 }
236
237 #[inline]
238 fn script_pop(&self, script: &ScriptPubkey) -> Result<Option<ScriptStatus>, Error> {
239 impl_inner_call!(self, script_pop, script)
240 }
241
242 #[inline]
243 fn script_get_balance(&self, script: &ScriptPubkey) -> Result<GetBalanceRes, Error> {
244 impl_inner_call!(self, script_get_balance, script)
245 }
246
247 #[inline]
248 fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result<Vec<GetBalanceRes>, Error>
249 where
250 I: IntoIterator + Clone,
251 I::Item: Borrow<&'s ScriptPubkey>,
252 {
253 impl_inner_call!(self, batch_script_get_balance, scripts.clone())
254 }
255
256 #[inline]
257 fn script_get_history(&self, script: &ScriptPubkey) -> Result<Vec<GetHistoryRes>, Error> {
258 impl_inner_call!(self, script_get_history, script)
259 }
260
261 #[inline]
262 fn batch_script_get_history<'s, I>(&self, scripts: I) -> Result<Vec<Vec<GetHistoryRes>>, Error>
263 where
264 I: IntoIterator + Clone,
265 I::Item: Borrow<&'s ScriptPubkey>,
266 {
267 impl_inner_call!(self, batch_script_get_history, scripts.clone())
268 }
269
270 #[inline]
271 fn script_list_unspent(&self, script: &ScriptPubkey) -> Result<Vec<ListUnspentRes>, Error> {
272 impl_inner_call!(self, script_list_unspent, script)
273 }
274
275 #[inline]
276 fn batch_script_list_unspent<'s, I>(
277 &self,
278 scripts: I,
279 ) -> Result<Vec<Vec<ListUnspentRes>>, Error>
280 where
281 I: IntoIterator + Clone,
282 I::Item: Borrow<&'s ScriptPubkey>,
283 {
284 impl_inner_call!(self, batch_script_list_unspent, scripts.clone())
285 }
286
287 fn script_get_mempool(&self, script: &ScriptPubkey) -> Result<Vec<GetMempoolRes>, Error> {
288 impl_inner_call!(self, script_get_mempool, script)
289 }
290
291 fn transaction_get_verbose(&self, txid: &Txid) -> Result<Option<TxRes>, Error> {
292 impl_inner_call!(self, transaction_get_verbose, txid)
293 }
294
295 #[inline]
296 fn transaction_get_raw(&self, txid: &Txid) -> Result<Option<Vec<u8>>, Error> {
297 impl_inner_call!(self, transaction_get_raw, txid)
298 }
299
300 #[inline]
301 fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result<Vec<Vec<u8>>, Error>
302 where
303 I: IntoIterator + Clone,
304 I::Item: Borrow<&'t Txid>,
305 {
306 impl_inner_call!(self, batch_transaction_get_raw, txids.clone())
307 }
308
309 #[inline]
310 fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result<Vec<Vec<u8>>, Error>
311 where
312 I: IntoIterator + Clone,
313 I::Item: Borrow<u32>,
314 {
315 impl_inner_call!(self, batch_block_header_raw, heights.clone())
316 }
317
318 #[inline]
319 fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result<Vec<f64>, Error>
320 where
321 I: IntoIterator + Clone,
322 I::Item: Borrow<usize>,
323 {
324 impl_inner_call!(self, batch_estimate_fee, numbers.clone())
325 }
326
327 #[inline]
328 fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result<Txid, Error> {
329 impl_inner_call!(self, transaction_broadcast_raw, raw_tx)
330 }
331
332 #[inline]
333 fn transaction_get_merkle(&self, txid: &Txid, height: usize) -> Result<GetMerkleRes, Error> {
334 impl_inner_call!(self, transaction_get_merkle, txid, height)
335 }
336
337 #[inline]
338 fn txid_from_pos(&self, height: usize, tx_pos: usize) -> Result<Txid, Error> {
339 impl_inner_call!(self, txid_from_pos, height, tx_pos)
340 }
341
342 #[inline]
343 fn txid_from_pos_with_merkle(
344 &self,
345 height: usize,
346 tx_pos: usize,
347 ) -> Result<TxidFromPosRes, Error> {
348 impl_inner_call!(self, txid_from_pos_with_merkle, height, tx_pos)
349 }
350
351 #[inline]
352 fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
353 impl_inner_call!(self, server_features)
354 }
355
356 #[inline]
357 fn ping(&self) -> Result<(), Error> {
358 impl_inner_call!(self, ping)
359 }
360
361 #[inline]
362 #[cfg(feature = "debug-calls")]
363 fn calls_made(&self) -> Result<usize, Error> {
364 impl_inner_call!(self, calls_made)
365 }
366}
367
368#[cfg(test)]
369mod tests {
370 use super::*;
371
372 #[test]
373 fn more_failed_attempts_than_retries_means_exhausted() {
374 let exhausted = retries_exhausted(10, 5);
375
376 assert!(exhausted)
377 }
378
379 #[test]
380 fn failed_attempts_bigger_than_u8_means_exhausted() {
381 let failed_attempts = u8::MAX as usize + 1;
382
383 let exhausted = retries_exhausted(failed_attempts, u8::MAX);
384
385 assert!(exhausted)
386 }
387
388 #[test]
389 fn less_failed_attempts_means_not_exhausted() {
390 let exhausted = retries_exhausted(2, 5);
391
392 assert!(!exhausted)
393 }
394
395 #[test]
396 fn attempts_equals_retries_means_not_exhausted_yet() {
397 let exhausted = retries_exhausted(2, 2);
398
399 assert!(!exhausted)
400 }
401
402 #[test]
403 #[ignore]
404 fn test_local_timeout() {
405 use std::net::TcpListener;
419 use std::sync::mpsc::channel;
420 use std::time::{Duration, Instant};
421
422 let endpoint =
423 std::env::var("TEST_ELECTRUM_TIMEOUT_PORT").unwrap_or("localhost:60000".into());
424 let (sender, receiver) = channel();
425
426 std::thread::spawn(move || {
427 let listener = TcpListener::bind("127.0.0.1:60000").unwrap();
428 sender.send(()).unwrap();
429
430 for _stream in listener.incoming() {
431 std::thread::sleep(std::time::Duration::from_secs(60));
432 }
433 });
434
435 receiver
436 .recv_timeout(Duration::from_secs(5))
437 .expect("Can't start local listener");
438
439 let now = Instant::now();
440 let client = Client::from_config(
441 &endpoint,
442 crate::config::ConfigBuilder::new().timeout(Some(5)).build(),
443 );
444 let elapsed = now.elapsed();
445
446 assert!(client.is_ok());
447 assert!(elapsed > Duration::from_secs(2));
448 }
449}