electrum-client-netagnostic 0.21.2

Electrum client library that is network agnostic. Supports plaintext, TLS, WebSocket and Onion servers.
Documentation
//! Electrum Client

use std::sync::RwLock;

use log::{info, warn};

use crate::api::ElectrumApi;
use crate::batch::Batch;
use crate::config::Config;
use crate::raw_client::*;
use crate::types::*;
use std::convert::TryFrom;

/// Generalized Electrum client that supports multiple backends. This wraps
/// [`RawClient`](client/struct.RawClient.html) and provides a more user-friendly
/// constructor that can choose the right backend based on the url prefix.
///
/// **This is available only with the `default` features, or if `proxy` and one ssl implementation are enabled**
pub enum ClientType {
    #[allow(missing_docs)]
    TCP(RawClient<ElectrumPlaintextStream>),
    #[allow(missing_docs)]
    SSL(RawClient<ElectrumSslStream>),
    #[allow(missing_docs)]
    Socks5(RawClient<ElectrumProxyStream>),
    #[cfg(feature = "use-websocket")]
    #[allow(missing_docs)]
    WS(RawClient<ElectrumWsStream>),
    #[cfg(all(
        feature = "use-websocket",
        any(feature = "use-rustls", feature = "use-rustls-ring"),
        not(feature = "use-openssl")
    ))]
    #[allow(missing_docs)]
    WSS(RawClient<ElectrumWssStream>),
}

/// Generalized Electrum client that supports multiple backends. Can re-instantiate client_type if connections
/// drops
pub struct Client {
    client_type: RwLock<ClientType>,
    config: Config,
    url: String,
}

macro_rules! impl_inner_call {
    ( $self:expr, $name:ident $(, $args:expr)* ) => {
    {
        let mut errors = vec![];
        loop {
            let read_client = $self.client_type.read().unwrap();
            let res = match &*read_client {
                ClientType::TCP(inner) => inner.$name( $($args, )* ),
                ClientType::SSL(inner) => inner.$name( $($args, )* ),
                ClientType::Socks5(inner) => inner.$name( $($args, )* ),
                #[cfg(feature = "use-websocket")]
                ClientType::WS(inner) => inner.$name( $($args, )* ),
                #[cfg(all(
                    feature = "use-websocket",
                    any(feature = "use-rustls", feature = "use-rustls-ring"),
                    not(feature = "use-openssl")
                ))]
                ClientType::WSS(inner) => inner.$name( $($args, )* ),
            };
            drop(read_client);
            match res {
                Ok(val) => return Ok(val),
                Err(Error::Protocol(_)) => {
                    return res;
                },
                Err(e) => {
                    let failed_attempts = errors.len() + 1;

                    if retries_exhausted(failed_attempts, $self.config.retry()) {
                        warn!("call '{}' failed after {} attempts", stringify!($name), failed_attempts);
                        return Err(Error::AllAttemptsErrored(errors));
                    }

                    warn!("call '{}' failed with {}, retry: {}/{}", stringify!($name), e, failed_attempts, $self.config.retry());

                    errors.push(e);

                    // Only one thread will try to recreate the client getting the write lock,
                    // other eventual threads will get Err and will block at the beginning of
                    // previous loop when trying to read()
                    if let Ok(mut write_client) = $self.client_type.try_write() {
                        loop {
                            std::thread::sleep(std::time::Duration::from_secs((1 << errors.len()).min(30) as u64));
                            match ClientType::from_config(&$self.url, &$self.config) {
                                Ok(new_client) => {
                                    info!("Succesfully created new client");
                                    *write_client = new_client;
                                    break;
                                },
                                Err(e) => {
                                    let failed_attempts = errors.len() + 1;

                                    if retries_exhausted(failed_attempts, $self.config.retry()) {
                                        warn!("re-creating client failed after {} attempts", failed_attempts);
                                        return Err(Error::AllAttemptsErrored(errors));
                                    }

                                    warn!("re-creating client failed with {}, retry: {}/{}", e, failed_attempts, $self.config.retry());

                                    errors.push(e);
                                }
                            }
                        }
                    }
                },
            }
        }}
    }
}

fn retries_exhausted(failed_attempts: usize, configured_retries: u8) -> bool {
    match u8::try_from(failed_attempts) {
        Ok(failed_attempts) => failed_attempts > configured_retries,
        Err(_) => true, // if the usize doesn't fit into a u8, we definitely exhausted our retries
    }
}

impl ClientType {
    /// Constructor that supports multiple backends and allows configuration through
    /// the [Config]
    pub fn from_config(url: &str, config: &Config) -> Result<Self, Error> {
        if url.starts_with("ssl://") {
            let url = url.replacen("ssl://", "", 1);
            let client = match config.socks5() {
                Some(socks5) => RawClient::new_proxy_ssl(
                    url.as_str(),
                    config.validate_domain(),
                    socks5,
                    config.timeout(),
                )?,
                None => {
                    RawClient::new_ssl(url.as_str(), config.validate_domain(), config.timeout())?
                }
            };

            Ok(ClientType::SSL(client))
        } else if url.starts_with("wss://") {
            #[cfg(all(
                feature = "use-websocket",
                any(feature = "use-rustls", feature = "use-rustls-ring"),
                not(feature = "use-openssl")
            ))]
            {
                let url = url.replacen("wss://", "", 1);
                let client = RawClient::new_wss(
                    url.as_str(),
                    config.validate_domain(),
                    config.timeout(),
                    config.max_message_size(),
                )?;
                Ok(ClientType::WSS(client))
            }
            #[cfg(not(all(
                feature = "use-websocket",
                any(feature = "use-rustls", feature = "use-rustls-ring"),
                not(feature = "use-openssl")
            )))]
            {
                Err(Error::Message(
                    "WSS support requires the 'use-websocket' feature and a rustls feature"
                        .to_string(),
                ))
            }
        } else if url.starts_with("ws://") {
            #[cfg(feature = "use-websocket")]
            {
                let url = url.replacen("ws://", "", 1);
                let client =
                    RawClient::new_ws(url.as_str(), config.timeout(), config.max_message_size())?;
                Ok(ClientType::WS(client))
            }
            #[cfg(not(feature = "use-websocket"))]
            {
                Err(Error::Message(
                    "WebSocket support requires the 'use-websocket' feature".to_string(),
                ))
            }
        } else {
            let url = url.replacen("tcp://", "", 1);

            Ok(match config.socks5().as_ref() {
                None => ClientType::TCP(RawClient::new(url.as_str(), config.timeout())?),
                Some(socks5) => ClientType::Socks5(RawClient::new_proxy(
                    url.as_str(),
                    socks5,
                    config.timeout(),
                )?),
            })
        }
    }
}

impl Client {
    /// Default constructor supporting multiple backends by providing a prefix
    ///
    /// Supported prefixes are:
    /// - tcp:// for a TCP plaintext client.
    /// - ssl:// for an SSL-encrypted client. The server certificate will be verified.
    /// - ws:// for a WebSocket client (requires `use-websocket` feature).
    /// - wss:// for a secure WebSocket client (requires `use-websocket` and a rustls feature).
    ///
    /// If no prefix is specified, then `tcp://` is assumed.
    ///
    /// See [Client::from_config] for more configuration options
    ///
    pub fn new(url: &str) -> Result<Self, Error> {
        Self::from_config(url, Config::default())
    }

    /// Generic constructor that supports multiple backends and allows configuration through
    /// the [Config]
    pub fn from_config(url: &str, config: Config) -> Result<Self, Error> {
        let client_type = RwLock::new(ClientType::from_config(url, &config)?);

        Ok(Client {
            client_type,
            config,
            url: url.to_string(),
        })
    }
}

impl ElectrumApi for Client {
    #[inline]
    fn raw_call(
        &self,
        method_name: &str,
        params: impl IntoIterator<Item = Param>,
    ) -> Result<serde_json::Value, Error> {
        // We can't passthrough this method to the inner client because it would require the
        // `params` argument to also be `Copy` (because it's used multiple times for multiple
        // retries). To avoid adding this extra trait bound we instead re-direct this call to the internal
        // `RawClient::internal_raw_call_with_vec` method.

        let vec = params.into_iter().collect::<Vec<Param>>();
        impl_inner_call!(self, internal_raw_call_with_vec, method_name, vec.clone());
    }

    #[inline]
    fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
        impl_inner_call!(self, batch_call, batch)
    }

    #[inline]
    fn ping(&self) -> Result<(), Error> {
        impl_inner_call!(self, ping)
    }

    #[inline]
    #[cfg(feature = "debug-calls")]
    fn calls_made(&self) -> Result<usize, Error> {
        impl_inner_call!(self, calls_made)
    }
}

#[cfg(test)]
mod tests {
    use super::*;

    #[test]
    fn more_failed_attempts_than_retries_means_exhausted() {
        let exhausted = retries_exhausted(10, 5);

        assert_eq!(exhausted, true)
    }

    #[test]
    fn failed_attempts_bigger_than_u8_means_exhausted() {
        let failed_attempts = u8::MAX as usize + 1;

        let exhausted = retries_exhausted(failed_attempts, u8::MAX);

        assert_eq!(exhausted, true)
    }

    #[test]
    fn less_failed_attempts_means_not_exhausted() {
        let exhausted = retries_exhausted(2, 5);

        assert_eq!(exhausted, false)
    }

    #[test]
    fn attempts_equals_retries_means_not_exhausted_yet() {
        let exhausted = retries_exhausted(2, 2);

        assert_eq!(exhausted, false)
    }

    #[test]
    #[ignore]
    fn test_local_timeout() {
        // This test assumes a couple things:
        // - that `localhost` is resolved to two IP addresses, `127.0.0.1` and `::1` (with the v6
        //   one having higher priority)
        // - that the system silently drops packets to `[::1]:60000` or a different port if
        //   specified through `TEST_ELECTRUM_TIMEOUT_PORT`
        //
        //   this can be setup with: ip6tables -I INPUT 1 -p tcp -d ::1 --dport 60000 -j DROP
        //   and removed with:       ip6tables -D INPUT -p tcp -d ::1 --dport 60000 -j DROP
        //
        // The test tries to create a client to `localhost` and expects it to succeed, but only
        // after at least 2 seconds have passed which is roughly the timeout time for the first
        // try.

        use std::net::TcpListener;
        use std::sync::mpsc::channel;
        use std::time::{Duration, Instant};

        let endpoint =
            std::env::var("TEST_ELECTRUM_TIMEOUT_PORT").unwrap_or("localhost:60000".into());
        let (sender, receiver) = channel();

        std::thread::spawn(move || {
            let listener = TcpListener::bind("127.0.0.1:60000").unwrap();
            sender.send(()).unwrap();

            for _stream in listener.incoming() {
                loop {}
            }
        });

        receiver
            .recv_timeout(Duration::from_secs(5))
            .expect("Can't start local listener");

        let now = Instant::now();
        let client = Client::from_config(
            &endpoint,
            crate::config::ConfigBuilder::new()
                .timeout(Some(Duration::from_secs(5)))
                .build(),
        );
        let elapsed = now.elapsed();

        assert!(client.is_ok());
        assert!(elapsed > Duration::from_secs(2));
    }
}