1use std::{borrow::Borrow, sync::RwLock};
4
5use log::{info, warn};
6
7use bitcoin::{Script, Txid};
8
9use crate::api::ElectrumApi;
10use crate::batch::Batch;
11use crate::config::Config;
12use crate::raw_client::*;
13use crate::types::*;
14use std::convert::TryFrom;
15
16pub enum ClientType {
22 #[allow(missing_docs)]
23 TCP(RawClient<ElectrumPlaintextStream>),
24 #[allow(missing_docs)]
25 SSL(RawClient<ElectrumSslStream>),
26 #[allow(missing_docs)]
27 Socks5(RawClient<ElectrumProxyStream>),
28}
29
30pub struct Client {
33 client_type: RwLock<ClientType>,
34 config: Config,
35 url: String,
36}
37
38macro_rules! impl_inner_call {
39 ( $self:expr, $name:ident $(, $args:expr)* ) => {
40 {
41 let mut errors = vec![];
42 loop {
43 let read_client = $self.client_type.read().unwrap();
44 let res = match &*read_client {
45 ClientType::TCP(inner) => inner.$name( $($args, )* ),
46 ClientType::SSL(inner) => inner.$name( $($args, )* ),
47 ClientType::Socks5(inner) => inner.$name( $($args, )* ),
48 };
49 drop(read_client);
50 match res {
51 Ok(val) => return Ok(val),
52 Err(Error::Protocol(_) | Error::AlreadySubscribed(_)) => {
53 return res;
54 },
55 Err(e) => {
56 let failed_attempts = errors.len() + 1;
57
58 if retries_exhausted(failed_attempts, $self.config.retry()) {
59 warn!("call '{}' failed after {} attempts", stringify!($name), failed_attempts);
60 return Err(Error::AllAttemptsErrored(errors));
61 }
62
63 warn!("call '{}' failed with {}, retry: {}/{}", stringify!($name), e, failed_attempts, $self.config.retry());
64
65 errors.push(e);
66
67 if let Ok(mut write_client) = $self.client_type.try_write() {
71 loop {
72 std::thread::sleep(std::time::Duration::from_secs((1 << errors.len()).min(30) as u64));
73 match ClientType::from_config(&$self.url, &$self.config) {
74 Ok(new_client) => {
75 info!("Succesfully created new client");
76 *write_client = new_client;
77 break;
78 },
79 Err(e) => {
80 let failed_attempts = errors.len() + 1;
81
82 if retries_exhausted(failed_attempts, $self.config.retry()) {
83 warn!("re-creating client failed after {} attempts", failed_attempts);
84 return Err(Error::AllAttemptsErrored(errors));
85 }
86
87 warn!("re-creating client failed with {}, retry: {}/{}", e, failed_attempts, $self.config.retry());
88
89 errors.push(e);
90 }
91 }
92 }
93 }
94 },
95 }
96 }}
97 }
98}
99
100fn retries_exhausted(failed_attempts: usize, configured_retries: u8) -> bool {
101 match u8::try_from(failed_attempts) {
102 Ok(failed_attempts) => failed_attempts > configured_retries,
103 Err(_) => true, }
105}
106
107impl ClientType {
108 pub fn from_config(url: &str, config: &Config) -> Result<Self, Error> {
111 if url.starts_with("ssl://") {
112 let url = url.replacen("ssl://", "", 1);
113 let client = match config.socks5() {
114 Some(socks5) => RawClient::new_proxy_ssl(
115 url.as_str(),
116 config.validate_domain(),
117 socks5,
118 config.timeout(),
119 )?,
120 None => {
121 RawClient::new_ssl(url.as_str(), config.validate_domain(), config.timeout())?
122 }
123 };
124
125 Ok(ClientType::SSL(client))
126 } else {
127 let url = url.replacen("tcp://", "", 1);
128
129 Ok(match config.socks5().as_ref() {
130 None => ClientType::TCP(RawClient::new(url.as_str(), config.timeout())?),
131 Some(socks5) => ClientType::Socks5(RawClient::new_proxy(
132 url.as_str(),
133 socks5,
134 config.timeout(),
135 )?),
136 })
137 }
138 }
139}
140
141impl Client {
142 pub fn new(url: &str) -> Result<Self, Error> {
152 Self::from_config(url, Config::default())
153 }
154
155 pub fn from_config(url: &str, config: Config) -> Result<Self, Error> {
158 let client_type = RwLock::new(ClientType::from_config(url, &config)?);
159
160 Ok(Client {
161 client_type,
162 config,
163 url: url.to_string(),
164 })
165 }
166}
167
168impl ElectrumApi for Client {
169 #[inline]
170 fn raw_call(
171 &self,
172 method_name: &str,
173 params: impl IntoIterator<Item = Param>,
174 ) -> Result<serde_json::Value, Error> {
175 let vec = params.into_iter().collect::<Vec<Param>>();
181 impl_inner_call!(self, internal_raw_call_with_vec, method_name, vec.clone());
182 }
183
184 #[inline]
185 fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
186 impl_inner_call!(self, batch_call, batch)
187 }
188
189 #[inline]
190 fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
191 impl_inner_call!(self, block_headers_subscribe_raw)
192 }
193
194 #[inline]
195 fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
196 impl_inner_call!(self, block_headers_pop_raw)
197 }
198
199 #[inline]
200 fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
201 impl_inner_call!(self, block_header_raw, height)
202 }
203
204 #[inline]
205 fn block_headers(&self, start_height: usize, count: usize) -> Result<GetHeadersRes, Error> {
206 impl_inner_call!(self, block_headers, start_height, count)
207 }
208
209 #[inline]
210 fn estimate_fee(&self, number: usize) -> Result<f64, Error> {
211 impl_inner_call!(self, estimate_fee, number)
212 }
213
214 #[inline]
215 fn relay_fee(&self) -> Result<f64, Error> {
216 impl_inner_call!(self, relay_fee)
217 }
218
219 #[inline]
220 fn script_subscribe(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
221 impl_inner_call!(self, script_subscribe, script)
222 }
223
224 #[inline]
225 fn batch_script_subscribe<'s, I>(&self, scripts: I) -> Result<Vec<Option<ScriptStatus>>, Error>
226 where
227 I: IntoIterator + Clone,
228 I::Item: Borrow<&'s Script>,
229 {
230 impl_inner_call!(self, batch_script_subscribe, scripts.clone())
231 }
232
233 #[inline]
234 fn script_unsubscribe(&self, script: &Script) -> Result<bool, Error> {
235 impl_inner_call!(self, script_unsubscribe, script)
236 }
237
238 #[inline]
239 fn script_pop(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
240 impl_inner_call!(self, script_pop, script)
241 }
242
243 #[inline]
244 fn script_get_balance(&self, script: &Script) -> Result<GetBalanceRes, Error> {
245 impl_inner_call!(self, script_get_balance, script)
246 }
247
248 #[inline]
249 fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result<Vec<GetBalanceRes>, Error>
250 where
251 I: IntoIterator + Clone,
252 I::Item: Borrow<&'s Script>,
253 {
254 impl_inner_call!(self, batch_script_get_balance, scripts.clone())
255 }
256
257 #[inline]
258 fn script_get_history(&self, script: &Script) -> Result<Vec<GetHistoryRes>, Error> {
259 impl_inner_call!(self, script_get_history, script)
260 }
261
262 #[inline]
263 fn batch_script_get_history<'s, I>(&self, scripts: I) -> Result<Vec<Vec<GetHistoryRes>>, Error>
264 where
265 I: IntoIterator + Clone,
266 I::Item: Borrow<&'s Script>,
267 {
268 impl_inner_call!(self, batch_script_get_history, scripts.clone())
269 }
270
271 #[inline]
272 fn script_list_unspent(&self, script: &Script) -> Result<Vec<ListUnspentRes>, Error> {
273 impl_inner_call!(self, script_list_unspent, script)
274 }
275
276 #[inline]
277 fn batch_script_list_unspent<'s, I>(
278 &self,
279 scripts: I,
280 ) -> Result<Vec<Vec<ListUnspentRes>>, Error>
281 where
282 I: IntoIterator + Clone,
283 I::Item: Borrow<&'s Script>,
284 {
285 impl_inner_call!(self, batch_script_list_unspent, scripts.clone())
286 }
287
288 #[inline]
289 fn transaction_get_raw(&self, txid: &Txid) -> Result<Vec<u8>, Error> {
290 impl_inner_call!(self, transaction_get_raw, txid)
291 }
292
293 #[inline]
294 fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result<Vec<Vec<u8>>, Error>
295 where
296 I: IntoIterator + Clone,
297 I::Item: Borrow<&'t Txid>,
298 {
299 impl_inner_call!(self, batch_transaction_get_raw, txids.clone())
300 }
301
302 #[inline]
303 fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result<Vec<Vec<u8>>, Error>
304 where
305 I: IntoIterator + Clone,
306 I::Item: Borrow<u32>,
307 {
308 impl_inner_call!(self, batch_block_header_raw, heights.clone())
309 }
310
311 #[inline]
312 fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result<Vec<f64>, Error>
313 where
314 I: IntoIterator + Clone,
315 I::Item: Borrow<usize>,
316 {
317 impl_inner_call!(self, batch_estimate_fee, numbers.clone())
318 }
319
320 #[inline]
321 fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result<Txid, Error> {
322 impl_inner_call!(self, transaction_broadcast_raw, raw_tx)
323 }
324
325 #[inline]
326 fn transaction_get_merkle(&self, txid: &Txid, height: usize) -> Result<GetMerkleRes, Error> {
327 impl_inner_call!(self, transaction_get_merkle, txid, height)
328 }
329
330 #[inline]
331 fn batch_transaction_get_merkle<I>(
332 &self,
333 txids_and_heights: I,
334 ) -> Result<Vec<GetMerkleRes>, Error>
335 where
336 I: IntoIterator + Clone,
337 I::Item: Borrow<(Txid, usize)>,
338 {
339 impl_inner_call!(
340 self,
341 batch_transaction_get_merkle,
342 txids_and_heights.clone()
343 )
344 }
345
346 #[inline]
347 fn txid_from_pos(&self, height: usize, tx_pos: usize) -> Result<Txid, Error> {
348 impl_inner_call!(self, txid_from_pos, height, tx_pos)
349 }
350
351 #[inline]
352 fn txid_from_pos_with_merkle(
353 &self,
354 height: usize,
355 tx_pos: usize,
356 ) -> Result<TxidFromPosRes, Error> {
357 impl_inner_call!(self, txid_from_pos_with_merkle, height, tx_pos)
358 }
359
360 #[inline]
361 fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
362 impl_inner_call!(self, server_features)
363 }
364
365 #[inline]
366 fn ping(&self) -> Result<(), Error> {
367 impl_inner_call!(self, ping)
368 }
369
370 #[inline]
371 #[cfg(feature = "debug-calls")]
372 fn calls_made(&self) -> Result<usize, Error> {
373 impl_inner_call!(self, calls_made)
374 }
375}
376
377#[cfg(test)]
378mod tests {
379 use super::*;
380
381 #[test]
382 fn more_failed_attempts_than_retries_means_exhausted() {
383 let exhausted = retries_exhausted(10, 5);
384
385 assert!(exhausted)
386 }
387
388 #[test]
389 fn failed_attempts_bigger_than_u8_means_exhausted() {
390 let failed_attempts = u8::MAX as usize + 1;
391
392 let exhausted = retries_exhausted(failed_attempts, u8::MAX);
393
394 assert!(exhausted)
395 }
396
397 #[test]
398 fn less_failed_attempts_means_not_exhausted() {
399 let exhausted = retries_exhausted(2, 5);
400
401 assert!(!exhausted)
402 }
403
404 #[test]
405 fn attempts_equals_retries_means_not_exhausted_yet() {
406 let exhausted = retries_exhausted(2, 2);
407
408 assert!(!exhausted)
409 }
410
411 #[test]
412 #[ignore]
413 fn test_local_timeout() {
414 use std::net::TcpListener;
428 use std::sync::mpsc::channel;
429 use std::time::{Duration, Instant};
430
431 let endpoint =
432 std::env::var("TEST_ELECTRUM_TIMEOUT_PORT").unwrap_or("localhost:60000".into());
433 let (sender, receiver) = channel();
434
435 std::thread::spawn(move || {
436 let listener = TcpListener::bind("127.0.0.1:60000").unwrap();
437 sender.send(()).unwrap();
438
439 for _stream in listener.incoming() {
440 std::thread::sleep(Duration::from_secs(60))
441 }
442 });
443
444 receiver
445 .recv_timeout(Duration::from_secs(5))
446 .expect("Can't start local listener");
447
448 let now = Instant::now();
449 let client = Client::from_config(
450 &endpoint,
451 crate::config::ConfigBuilder::new().timeout(Some(5)).build(),
452 );
453 let elapsed = now.elapsed();
454
455 assert!(client.is_ok());
456 assert!(elapsed > Duration::from_secs(2));
457 }
458}