electrum/
client.rs

1//! Electrum Client
2
3use std::{borrow::Borrow, sync::RwLock};
4
5use log::{info, warn};
6
7use crate::api::ElectrumApi;
8use crate::batch::Batch;
9use crate::config::Config;
10use crate::raw_client::*;
11use crate::types::*;
12use bp::{ScriptPubkey, Txid};
13use std::convert::TryFrom;
14
15/// Generalized Electrum client that supports multiple backends. This wraps
16/// [`RawClient`](client/struct.RawClient.html) and provides a more user-friendly
17/// constructor that can choose the right backend based on the url prefix.
18///
19/// **This is available only with the `default` features, or if `proxy` and one ssl implementation are enabled**
20pub enum ClientType {
21    #[allow(missing_docs)]
22    TCP(RawClient<ElectrumPlaintextStream>),
23    #[allow(missing_docs)]
24    SSL(RawClient<ElectrumSslStream>),
25    #[allow(missing_docs)]
26    Socks5(RawClient<ElectrumProxyStream>),
27}
28
29/// Generalized Electrum client that supports multiple backends. Can re-instantiate client_type if connections
30/// drops
31pub struct Client {
32    client_type: RwLock<ClientType>,
33    config: Config,
34    url: String,
35}
36
37macro_rules! impl_inner_call {
38    ( $self:expr, $name:ident $(, $args:expr)* ) => {
39    {
40        let mut errors = vec![];
41        loop {
42            let read_client = $self.client_type.read().unwrap();
43            let res = match &*read_client {
44                ClientType::TCP(inner) => inner.$name( $($args, )* ),
45                ClientType::SSL(inner) => inner.$name( $($args, )* ),
46                ClientType::Socks5(inner) => inner.$name( $($args, )* ),
47            };
48            drop(read_client);
49            match res {
50                Ok(val) => return Ok(val),
51                Err(Error::Protocol(_) | Error::AlreadySubscribed(_)) => {
52                    return res;
53                },
54                Err(e) => {
55                    let failed_attempts = errors.len() + 1;
56
57                    if retries_exhausted(failed_attempts, $self.config.retry()) {
58                        warn!("call '{}' failed after {} attempts", stringify!($name), failed_attempts);
59                        return Err(Error::AllAttemptsErrored(errors));
60                    }
61
62                    warn!("call '{}' failed with {}, retry: {}/{}", stringify!($name), e, failed_attempts, $self.config.retry());
63
64                    errors.push(e);
65
66                    // Only one thread will try to recreate the client getting the write lock,
67                    // other eventual threads will get Err and will block at the beginning of
68                    // previous loop when trying to read()
69                    if let Ok(mut write_client) = $self.client_type.try_write() {
70                        loop {
71                            std::thread::sleep(std::time::Duration::from_secs((1 << errors.len()).min(30) as u64));
72                            match ClientType::from_config(&$self.url, &$self.config) {
73                                Ok(new_client) => {
74                                    info!("Successfully created new client");
75                                    *write_client = new_client;
76                                    break;
77                                },
78                                Err(e) => {
79                                    let failed_attempts = errors.len() + 1;
80
81                                    if retries_exhausted(failed_attempts, $self.config.retry()) {
82                                        warn!("re-creating client failed after {} attempts", failed_attempts);
83                                        return Err(Error::AllAttemptsErrored(errors));
84                                    }
85
86                                    warn!("re-creating client failed with {}, retry: {}/{}", e, failed_attempts, $self.config.retry());
87
88                                    errors.push(e);
89                                }
90                            }
91                        }
92                    }
93                },
94            }
95        }}
96    }
97}
98
99fn retries_exhausted(failed_attempts: usize, configured_retries: u8) -> bool {
100    match u8::try_from(failed_attempts) {
101        Ok(failed_attempts) => failed_attempts > configured_retries,
102        Err(_) => true, // if the usize doesn't fit into a u8, we definitely exhausted our retries
103    }
104}
105
106impl ClientType {
107    /// Constructor that supports multiple backends and allows configuration through
108    /// the [Config]
109    pub fn from_config(url: &str, config: &Config) -> Result<Self, Error> {
110        if url.starts_with("ssl://") {
111            let url = url.replacen("ssl://", "", 1);
112            let client = match config.socks5() {
113                Some(socks5) => RawClient::new_proxy_ssl(
114                    url.as_str(),
115                    config.validate_domain(),
116                    socks5,
117                    config.timeout(),
118                )?,
119                None => {
120                    RawClient::new_ssl(url.as_str(), config.validate_domain(), config.timeout())?
121                }
122            };
123
124            Ok(ClientType::SSL(client))
125        } else {
126            let url = url.replacen("tcp://", "", 1);
127
128            Ok(match config.socks5().as_ref() {
129                None => ClientType::TCP(RawClient::new(url.as_str(), config.timeout())?),
130                Some(socks5) => ClientType::Socks5(RawClient::new_proxy(
131                    url.as_str(),
132                    socks5,
133                    config.timeout(),
134                )?),
135            })
136        }
137    }
138}
139
140impl Client {
141    /// Default constructor supporting multiple backends by providing a prefix
142    ///
143    /// Supported prefixes are:
144    /// - tcp:// for a TCP plaintext client.
145    /// - ssl:// for an SSL-encrypted client. The server certificate will be verified.
146    ///
147    /// If no prefix is specified, then `tcp://` is assumed.
148    ///
149    /// See [Client::from_config] for more configuration options
150    pub fn new(url: &str) -> Result<Self, Error> {
151        Self::from_config(url, Config::default())
152    }
153
154    /// Generic constructor that supports multiple backends and allows configuration through
155    /// the [Config]
156    pub fn from_config(url: &str, config: Config) -> Result<Self, Error> {
157        let client_type = RwLock::new(ClientType::from_config(url, &config)?);
158
159        Ok(Client {
160            client_type,
161            config,
162            url: url.to_string(),
163        })
164    }
165}
166
167impl ElectrumApi for Client {
168    #[inline]
169    fn raw_call(
170        &self,
171        method_name: &str,
172        params: impl IntoIterator<Item = Param>,
173    ) -> Result<serde_json::Value, Error> {
174        // We can't pass through this method to the inner client because it would require the
175        // `params` argument to also be `Copy` (because it's used multiple times for multiple
176        // retries). To avoid adding this extra trait bound we instead re-direct this call to the internal
177        // `RawClient::internal_raw_call_with_vec` method.
178
179        let vec = params.into_iter().collect::<Vec<Param>>();
180        impl_inner_call!(self, internal_raw_call_with_vec, method_name, vec.clone());
181    }
182
183    #[inline]
184    fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
185        impl_inner_call!(self, batch_call, batch)
186    }
187
188    #[inline]
189    fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
190        impl_inner_call!(self, block_headers_subscribe_raw)
191    }
192
193    #[inline]
194    fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
195        impl_inner_call!(self, block_headers_pop_raw)
196    }
197
198    #[inline]
199    fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
200        impl_inner_call!(self, block_header_raw, height)
201    }
202
203    #[inline]
204    fn block_headers(&self, start_height: usize, count: usize) -> Result<GetHeadersRes, Error> {
205        impl_inner_call!(self, block_headers, start_height, count)
206    }
207
208    #[inline]
209    fn estimate_fee(&self, number: usize) -> Result<f64, Error> {
210        impl_inner_call!(self, estimate_fee, number)
211    }
212
213    #[inline]
214    fn relay_fee(&self) -> Result<f64, Error> {
215        impl_inner_call!(self, relay_fee)
216    }
217
218    #[inline]
219    fn script_subscribe(&self, script: &ScriptPubkey) -> Result<Option<ScriptStatus>, Error> {
220        impl_inner_call!(self, script_subscribe, script)
221    }
222
223    #[inline]
224    fn batch_script_subscribe<'s, I>(&self, scripts: I) -> Result<Vec<Option<ScriptStatus>>, Error>
225    where
226        I: IntoIterator + Clone,
227        I::Item: Borrow<&'s ScriptPubkey>,
228    {
229        impl_inner_call!(self, batch_script_subscribe, scripts.clone())
230    }
231
232    #[inline]
233    fn script_unsubscribe(&self, script: &ScriptPubkey) -> Result<bool, Error> {
234        impl_inner_call!(self, script_unsubscribe, script)
235    }
236
237    #[inline]
238    fn script_pop(&self, script: &ScriptPubkey) -> Result<Option<ScriptStatus>, Error> {
239        impl_inner_call!(self, script_pop, script)
240    }
241
242    #[inline]
243    fn script_get_balance(&self, script: &ScriptPubkey) -> Result<GetBalanceRes, Error> {
244        impl_inner_call!(self, script_get_balance, script)
245    }
246
247    #[inline]
248    fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result<Vec<GetBalanceRes>, Error>
249    where
250        I: IntoIterator + Clone,
251        I::Item: Borrow<&'s ScriptPubkey>,
252    {
253        impl_inner_call!(self, batch_script_get_balance, scripts.clone())
254    }
255
256    #[inline]
257    fn script_get_history(&self, script: &ScriptPubkey) -> Result<Vec<GetHistoryRes>, Error> {
258        impl_inner_call!(self, script_get_history, script)
259    }
260
261    #[inline]
262    fn batch_script_get_history<'s, I>(&self, scripts: I) -> Result<Vec<Vec<GetHistoryRes>>, Error>
263    where
264        I: IntoIterator + Clone,
265        I::Item: Borrow<&'s ScriptPubkey>,
266    {
267        impl_inner_call!(self, batch_script_get_history, scripts.clone())
268    }
269
270    #[inline]
271    fn script_list_unspent(&self, script: &ScriptPubkey) -> Result<Vec<ListUnspentRes>, Error> {
272        impl_inner_call!(self, script_list_unspent, script)
273    }
274
275    #[inline]
276    fn batch_script_list_unspent<'s, I>(
277        &self,
278        scripts: I,
279    ) -> Result<Vec<Vec<ListUnspentRes>>, Error>
280    where
281        I: IntoIterator + Clone,
282        I::Item: Borrow<&'s ScriptPubkey>,
283    {
284        impl_inner_call!(self, batch_script_list_unspent, scripts.clone())
285    }
286
287    fn script_get_mempool(&self, script: &ScriptPubkey) -> Result<Vec<GetMempoolRes>, Error> {
288        impl_inner_call!(self, script_get_mempool, script)
289    }
290
291    fn transaction_get_verbose(&self, txid: &Txid) -> Result<Option<TxRes>, Error> {
292        impl_inner_call!(self, transaction_get_verbose, txid)
293    }
294
295    #[inline]
296    fn transaction_get_raw(&self, txid: &Txid) -> Result<Option<Vec<u8>>, Error> {
297        impl_inner_call!(self, transaction_get_raw, txid)
298    }
299
300    #[inline]
301    fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result<Vec<Vec<u8>>, Error>
302    where
303        I: IntoIterator + Clone,
304        I::Item: Borrow<&'t Txid>,
305    {
306        impl_inner_call!(self, batch_transaction_get_raw, txids.clone())
307    }
308
309    #[inline]
310    fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result<Vec<Vec<u8>>, Error>
311    where
312        I: IntoIterator + Clone,
313        I::Item: Borrow<u32>,
314    {
315        impl_inner_call!(self, batch_block_header_raw, heights.clone())
316    }
317
318    #[inline]
319    fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result<Vec<f64>, Error>
320    where
321        I: IntoIterator + Clone,
322        I::Item: Borrow<usize>,
323    {
324        impl_inner_call!(self, batch_estimate_fee, numbers.clone())
325    }
326
327    #[inline]
328    fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result<Txid, Error> {
329        impl_inner_call!(self, transaction_broadcast_raw, raw_tx)
330    }
331
332    #[inline]
333    fn transaction_get_merkle(&self, txid: &Txid, height: usize) -> Result<GetMerkleRes, Error> {
334        impl_inner_call!(self, transaction_get_merkle, txid, height)
335    }
336
337    #[inline]
338    fn txid_from_pos(&self, height: usize, tx_pos: usize) -> Result<Txid, Error> {
339        impl_inner_call!(self, txid_from_pos, height, tx_pos)
340    }
341
342    #[inline]
343    fn txid_from_pos_with_merkle(
344        &self,
345        height: usize,
346        tx_pos: usize,
347    ) -> Result<TxidFromPosRes, Error> {
348        impl_inner_call!(self, txid_from_pos_with_merkle, height, tx_pos)
349    }
350
351    #[inline]
352    fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
353        impl_inner_call!(self, server_features)
354    }
355
356    #[inline]
357    fn ping(&self) -> Result<(), Error> {
358        impl_inner_call!(self, ping)
359    }
360
361    #[inline]
362    #[cfg(feature = "debug-calls")]
363    fn calls_made(&self) -> Result<usize, Error> {
364        impl_inner_call!(self, calls_made)
365    }
366}
367
368#[cfg(test)]
369mod tests {
370    use super::*;
371
372    #[test]
373    fn more_failed_attempts_than_retries_means_exhausted() {
374        let exhausted = retries_exhausted(10, 5);
375
376        assert!(exhausted)
377    }
378
379    #[test]
380    fn failed_attempts_bigger_than_u8_means_exhausted() {
381        let failed_attempts = u8::MAX as usize + 1;
382
383        let exhausted = retries_exhausted(failed_attempts, u8::MAX);
384
385        assert!(exhausted)
386    }
387
388    #[test]
389    fn less_failed_attempts_means_not_exhausted() {
390        let exhausted = retries_exhausted(2, 5);
391
392        assert!(!exhausted)
393    }
394
395    #[test]
396    fn attempts_equals_retries_means_not_exhausted_yet() {
397        let exhausted = retries_exhausted(2, 2);
398
399        assert!(!exhausted)
400    }
401
402    #[test]
403    #[ignore]
404    fn test_local_timeout() {
405        // This test assumes a couple things:
406        // - that `localhost` is resolved to two IP addresses, `127.0.0.1` and `::1` (with the v6
407        //   one having higher priority)
408        // - that the system silently drops packets to `[::1]:60000` or a different port if
409        //   specified through `TEST_ELECTRUM_TIMEOUT_PORT`
410        //
411        //   this can be setup with: ip6tables -I INPUT 1 -p tcp -d ::1 --dport 60000 -j DROP
412        //   and removed with:       ip6tables -D INPUT -p tcp -d ::1 --dport 60000 -j DROP
413        //
414        // The test tries to create a client to `localhost` and expects it to succeed, but only
415        // after at least 2 seconds have passed which is roughly the timeout time for the first
416        // try.
417
418        use std::net::TcpListener;
419        use std::sync::mpsc::channel;
420        use std::time::{Duration, Instant};
421
422        let endpoint =
423            std::env::var("TEST_ELECTRUM_TIMEOUT_PORT").unwrap_or("localhost:60000".into());
424        let (sender, receiver) = channel();
425
426        std::thread::spawn(move || {
427            let listener = TcpListener::bind("127.0.0.1:60000").unwrap();
428            sender.send(()).unwrap();
429
430            for _stream in listener.incoming() {
431                std::thread::sleep(std::time::Duration::from_secs(60));
432            }
433        });
434
435        receiver
436            .recv_timeout(Duration::from_secs(5))
437            .expect("Can't start local listener");
438
439        let now = Instant::now();
440        let client = Client::from_config(
441            &endpoint,
442            crate::config::ConfigBuilder::new().timeout(Some(5)).build(),
443        );
444        let elapsed = now.elapsed();
445
446        assert!(client.is_ok());
447        assert!(elapsed > Duration::from_secs(2));
448    }
449}