electrum_client/
client.rs

1//! Electrum Client
2
3use std::{borrow::Borrow, sync::RwLock};
4
5use log::{info, warn};
6
7use bitcoin::{Script, Txid};
8
9use crate::api::ElectrumApi;
10use crate::batch::Batch;
11use crate::config::Config;
12use crate::raw_client::*;
13use crate::types::*;
14use std::convert::TryFrom;
15
16/// Generalized Electrum client that supports multiple backends. This wraps
17/// [`RawClient`](client/struct.RawClient.html) and provides a more user-friendly
18/// constructor that can choose the right backend based on the url prefix.
19///
20/// **This is available only with the `default` features, or if `proxy` and one ssl implementation are enabled**
21pub enum ClientType {
22    #[allow(missing_docs)]
23    TCP(RawClient<ElectrumPlaintextStream>),
24    #[allow(missing_docs)]
25    SSL(RawClient<ElectrumSslStream>),
26    #[allow(missing_docs)]
27    Socks5(RawClient<ElectrumProxyStream>),
28}
29
30/// Generalized Electrum client that supports multiple backends. Can re-instantiate client_type if connections
31/// drops
32pub struct Client {
33    client_type: RwLock<ClientType>,
34    config: Config,
35    url: String,
36}
37
38macro_rules! impl_inner_call {
39    ( $self:expr, $name:ident $(, $args:expr)* ) => {
40    {
41        let mut errors = vec![];
42        loop {
43            let read_client = $self.client_type.read().unwrap();
44            let res = match &*read_client {
45                ClientType::TCP(inner) => inner.$name( $($args, )* ),
46                ClientType::SSL(inner) => inner.$name( $($args, )* ),
47                ClientType::Socks5(inner) => inner.$name( $($args, )* ),
48            };
49            drop(read_client);
50            match res {
51                Ok(val) => return Ok(val),
52                Err(Error::Protocol(_) | Error::AlreadySubscribed(_)) => {
53                    return res;
54                },
55                Err(e) => {
56                    let failed_attempts = errors.len() + 1;
57
58                    if retries_exhausted(failed_attempts, $self.config.retry()) {
59                        warn!("call '{}' failed after {} attempts", stringify!($name), failed_attempts);
60                        return Err(Error::AllAttemptsErrored(errors));
61                    }
62
63                    warn!("call '{}' failed with {}, retry: {}/{}", stringify!($name), e, failed_attempts, $self.config.retry());
64
65                    errors.push(e);
66
67                    // Only one thread will try to recreate the client getting the write lock,
68                    // other eventual threads will get Err and will block at the beginning of
69                    // previous loop when trying to read()
70                    if let Ok(mut write_client) = $self.client_type.try_write() {
71                        loop {
72                            std::thread::sleep(std::time::Duration::from_secs((1 << errors.len()).min(30) as u64));
73                            match ClientType::from_config(&$self.url, &$self.config) {
74                                Ok(new_client) => {
75                                    info!("Succesfully created new client");
76                                    *write_client = new_client;
77                                    break;
78                                },
79                                Err(e) => {
80                                    let failed_attempts = errors.len() + 1;
81
82                                    if retries_exhausted(failed_attempts, $self.config.retry()) {
83                                        warn!("re-creating client failed after {} attempts", failed_attempts);
84                                        return Err(Error::AllAttemptsErrored(errors));
85                                    }
86
87                                    warn!("re-creating client failed with {}, retry: {}/{}", e, failed_attempts, $self.config.retry());
88
89                                    errors.push(e);
90                                }
91                            }
92                        }
93                    }
94                },
95            }
96        }}
97    }
98}
99
100fn retries_exhausted(failed_attempts: usize, configured_retries: u8) -> bool {
101    match u8::try_from(failed_attempts) {
102        Ok(failed_attempts) => failed_attempts > configured_retries,
103        Err(_) => true, // if the usize doesn't fit into a u8, we definitely exhausted our retries
104    }
105}
106
107impl ClientType {
108    /// Constructor that supports multiple backends and allows configuration through
109    /// the [Config]
110    pub fn from_config(url: &str, config: &Config) -> Result<Self, Error> {
111        if url.starts_with("ssl://") {
112            let url = url.replacen("ssl://", "", 1);
113            let client = match config.socks5() {
114                Some(socks5) => RawClient::new_proxy_ssl(
115                    url.as_str(),
116                    config.validate_domain(),
117                    socks5,
118                    config.timeout(),
119                )?,
120                None => {
121                    RawClient::new_ssl(url.as_str(), config.validate_domain(), config.timeout())?
122                }
123            };
124
125            Ok(ClientType::SSL(client))
126        } else {
127            let url = url.replacen("tcp://", "", 1);
128
129            Ok(match config.socks5().as_ref() {
130                None => ClientType::TCP(RawClient::new(url.as_str(), config.timeout())?),
131                Some(socks5) => ClientType::Socks5(RawClient::new_proxy(
132                    url.as_str(),
133                    socks5,
134                    config.timeout(),
135                )?),
136            })
137        }
138    }
139}
140
141impl Client {
142    /// Default constructor supporting multiple backends by providing a prefix
143    ///
144    /// Supported prefixes are:
145    /// - tcp:// for a TCP plaintext client.
146    /// - ssl:// for an SSL-encrypted client. The server certificate will be verified.
147    ///
148    /// If no prefix is specified, then `tcp://` is assumed.
149    ///
150    /// See [Client::from_config] for more configuration options
151    pub fn new(url: &str) -> Result<Self, Error> {
152        Self::from_config(url, Config::default())
153    }
154
155    /// Generic constructor that supports multiple backends and allows configuration through
156    /// the [Config]
157    pub fn from_config(url: &str, config: Config) -> Result<Self, Error> {
158        let client_type = RwLock::new(ClientType::from_config(url, &config)?);
159
160        Ok(Client {
161            client_type,
162            config,
163            url: url.to_string(),
164        })
165    }
166}
167
168impl ElectrumApi for Client {
169    #[inline]
170    fn raw_call(
171        &self,
172        method_name: &str,
173        params: impl IntoIterator<Item = Param>,
174    ) -> Result<serde_json::Value, Error> {
175        // We can't passthrough this method to the inner client because it would require the
176        // `params` argument to also be `Copy` (because it's used multiple times for multiple
177        // retries). To avoid adding this extra trait bound we instead re-direct this call to the internal
178        // `RawClient::internal_raw_call_with_vec` method.
179
180        let vec = params.into_iter().collect::<Vec<Param>>();
181        impl_inner_call!(self, internal_raw_call_with_vec, method_name, vec.clone());
182    }
183
184    #[inline]
185    fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
186        impl_inner_call!(self, batch_call, batch)
187    }
188
189    #[inline]
190    fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
191        impl_inner_call!(self, block_headers_subscribe_raw)
192    }
193
194    #[inline]
195    fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
196        impl_inner_call!(self, block_headers_pop_raw)
197    }
198
199    #[inline]
200    fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
201        impl_inner_call!(self, block_header_raw, height)
202    }
203
204    #[inline]
205    fn block_headers(&self, start_height: usize, count: usize) -> Result<GetHeadersRes, Error> {
206        impl_inner_call!(self, block_headers, start_height, count)
207    }
208
209    #[inline]
210    fn estimate_fee(&self, number: usize) -> Result<f64, Error> {
211        impl_inner_call!(self, estimate_fee, number)
212    }
213
214    #[inline]
215    fn relay_fee(&self) -> Result<f64, Error> {
216        impl_inner_call!(self, relay_fee)
217    }
218
219    #[inline]
220    fn script_subscribe(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
221        impl_inner_call!(self, script_subscribe, script)
222    }
223
224    #[inline]
225    fn batch_script_subscribe<'s, I>(&self, scripts: I) -> Result<Vec<Option<ScriptStatus>>, Error>
226    where
227        I: IntoIterator + Clone,
228        I::Item: Borrow<&'s Script>,
229    {
230        impl_inner_call!(self, batch_script_subscribe, scripts.clone())
231    }
232
233    #[inline]
234    fn script_unsubscribe(&self, script: &Script) -> Result<bool, Error> {
235        impl_inner_call!(self, script_unsubscribe, script)
236    }
237
238    #[inline]
239    fn script_pop(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
240        impl_inner_call!(self, script_pop, script)
241    }
242
243    #[inline]
244    fn script_get_balance(&self, script: &Script) -> Result<GetBalanceRes, Error> {
245        impl_inner_call!(self, script_get_balance, script)
246    }
247
248    #[inline]
249    fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result<Vec<GetBalanceRes>, Error>
250    where
251        I: IntoIterator + Clone,
252        I::Item: Borrow<&'s Script>,
253    {
254        impl_inner_call!(self, batch_script_get_balance, scripts.clone())
255    }
256
257    #[inline]
258    fn script_get_history(&self, script: &Script) -> Result<Vec<GetHistoryRes>, Error> {
259        impl_inner_call!(self, script_get_history, script)
260    }
261
262    #[inline]
263    fn batch_script_get_history<'s, I>(&self, scripts: I) -> Result<Vec<Vec<GetHistoryRes>>, Error>
264    where
265        I: IntoIterator + Clone,
266        I::Item: Borrow<&'s Script>,
267    {
268        impl_inner_call!(self, batch_script_get_history, scripts.clone())
269    }
270
271    #[inline]
272    fn script_list_unspent(&self, script: &Script) -> Result<Vec<ListUnspentRes>, Error> {
273        impl_inner_call!(self, script_list_unspent, script)
274    }
275
276    #[inline]
277    fn batch_script_list_unspent<'s, I>(
278        &self,
279        scripts: I,
280    ) -> Result<Vec<Vec<ListUnspentRes>>, Error>
281    where
282        I: IntoIterator + Clone,
283        I::Item: Borrow<&'s Script>,
284    {
285        impl_inner_call!(self, batch_script_list_unspent, scripts.clone())
286    }
287
288    #[inline]
289    fn transaction_get_raw(&self, txid: &Txid) -> Result<Vec<u8>, Error> {
290        impl_inner_call!(self, transaction_get_raw, txid)
291    }
292
293    #[inline]
294    fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result<Vec<Vec<u8>>, Error>
295    where
296        I: IntoIterator + Clone,
297        I::Item: Borrow<&'t Txid>,
298    {
299        impl_inner_call!(self, batch_transaction_get_raw, txids.clone())
300    }
301
302    #[inline]
303    fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result<Vec<Vec<u8>>, Error>
304    where
305        I: IntoIterator + Clone,
306        I::Item: Borrow<u32>,
307    {
308        impl_inner_call!(self, batch_block_header_raw, heights.clone())
309    }
310
311    #[inline]
312    fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result<Vec<f64>, Error>
313    where
314        I: IntoIterator + Clone,
315        I::Item: Borrow<usize>,
316    {
317        impl_inner_call!(self, batch_estimate_fee, numbers.clone())
318    }
319
320    #[inline]
321    fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result<Txid, Error> {
322        impl_inner_call!(self, transaction_broadcast_raw, raw_tx)
323    }
324
325    #[inline]
326    fn transaction_get_merkle(&self, txid: &Txid, height: usize) -> Result<GetMerkleRes, Error> {
327        impl_inner_call!(self, transaction_get_merkle, txid, height)
328    }
329
330    #[inline]
331    fn batch_transaction_get_merkle<I>(
332        &self,
333        txids_and_heights: I,
334    ) -> Result<Vec<GetMerkleRes>, Error>
335    where
336        I: IntoIterator + Clone,
337        I::Item: Borrow<(Txid, usize)>,
338    {
339        impl_inner_call!(
340            self,
341            batch_transaction_get_merkle,
342            txids_and_heights.clone()
343        )
344    }
345
346    #[inline]
347    fn txid_from_pos(&self, height: usize, tx_pos: usize) -> Result<Txid, Error> {
348        impl_inner_call!(self, txid_from_pos, height, tx_pos)
349    }
350
351    #[inline]
352    fn txid_from_pos_with_merkle(
353        &self,
354        height: usize,
355        tx_pos: usize,
356    ) -> Result<TxidFromPosRes, Error> {
357        impl_inner_call!(self, txid_from_pos_with_merkle, height, tx_pos)
358    }
359
360    #[inline]
361    fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
362        impl_inner_call!(self, server_features)
363    }
364
365    #[inline]
366    fn ping(&self) -> Result<(), Error> {
367        impl_inner_call!(self, ping)
368    }
369
370    #[inline]
371    #[cfg(feature = "debug-calls")]
372    fn calls_made(&self) -> Result<usize, Error> {
373        impl_inner_call!(self, calls_made)
374    }
375}
376
377#[cfg(test)]
378mod tests {
379    use super::*;
380
381    #[test]
382    fn more_failed_attempts_than_retries_means_exhausted() {
383        let exhausted = retries_exhausted(10, 5);
384
385        assert!(exhausted)
386    }
387
388    #[test]
389    fn failed_attempts_bigger_than_u8_means_exhausted() {
390        let failed_attempts = u8::MAX as usize + 1;
391
392        let exhausted = retries_exhausted(failed_attempts, u8::MAX);
393
394        assert!(exhausted)
395    }
396
397    #[test]
398    fn less_failed_attempts_means_not_exhausted() {
399        let exhausted = retries_exhausted(2, 5);
400
401        assert!(!exhausted)
402    }
403
404    #[test]
405    fn attempts_equals_retries_means_not_exhausted_yet() {
406        let exhausted = retries_exhausted(2, 2);
407
408        assert!(!exhausted)
409    }
410
411    #[test]
412    #[ignore]
413    fn test_local_timeout() {
414        // This test assumes a couple things:
415        // - that `localhost` is resolved to two IP addresses, `127.0.0.1` and `::1` (with the v6
416        //   one having higher priority)
417        // - that the system silently drops packets to `[::1]:60000` or a different port if
418        //   specified through `TEST_ELECTRUM_TIMEOUT_PORT`
419        //
420        //   this can be setup with: ip6tables -I INPUT 1 -p tcp -d ::1 --dport 60000 -j DROP
421        //   and removed with:       ip6tables -D INPUT -p tcp -d ::1 --dport 60000 -j DROP
422        //
423        // The test tries to create a client to `localhost` and expects it to succeed, but only
424        // after at least 2 seconds have passed which is roughly the timeout time for the first
425        // try.
426
427        use std::net::TcpListener;
428        use std::sync::mpsc::channel;
429        use std::time::{Duration, Instant};
430
431        let endpoint =
432            std::env::var("TEST_ELECTRUM_TIMEOUT_PORT").unwrap_or("localhost:60000".into());
433        let (sender, receiver) = channel();
434
435        std::thread::spawn(move || {
436            let listener = TcpListener::bind("127.0.0.1:60000").unwrap();
437            sender.send(()).unwrap();
438
439            for _stream in listener.incoming() {
440                std::thread::sleep(Duration::from_secs(60))
441            }
442        });
443
444        receiver
445            .recv_timeout(Duration::from_secs(5))
446            .expect("Can't start local listener");
447
448        let now = Instant::now();
449        let client = Client::from_config(
450            &endpoint,
451            crate::config::ConfigBuilder::new().timeout(Some(5)).build(),
452        );
453        let elapsed = now.elapsed();
454
455        assert!(client.is_ok());
456        assert!(elapsed > Duration::from_secs(2));
457    }
458}