electrum_client/
raw_client.rs

1//! Raw client
2//!
3//! This module contains the definition of the raw client that wraps the transport method
4
5use std::borrow::Borrow;
6use std::collections::{BTreeMap, BTreeSet, HashMap, VecDeque};
7use std::io::{BufRead, BufReader, Read, Write};
8use std::mem::drop;
9use std::net::{TcpStream, ToSocketAddrs};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::mpsc::{channel, Receiver, Sender};
12use std::sync::{Arc, Mutex, TryLockError};
13use std::time::Duration;
14
15#[allow(unused_imports)]
16use log::{debug, error, info, trace, warn};
17
18use bitcoin::consensus::encode::deserialize;
19use bitcoin::hex::{DisplayHex, FromHex};
20use bitcoin::{Script, Txid};
21
22#[cfg(feature = "use-openssl")]
23use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode};
24
25#[cfg(all(
26    any(
27        feature = "default",
28        feature = "use-rustls",
29        feature = "use-rustls-ring"
30    ),
31    not(feature = "use-openssl")
32))]
33use rustls::{
34    pki_types::ServerName,
35    pki_types::{Der, TrustAnchor},
36    ClientConfig, ClientConnection, RootCertStore, StreamOwned,
37};
38
39#[cfg(any(feature = "default", feature = "proxy"))]
40use crate::socks::{Socks5Stream, TargetAddr, ToTargetAddr};
41
42use crate::stream::ClonableStream;
43
44use crate::api::ElectrumApi;
45use crate::batch::Batch;
46use crate::types::*;
47
48macro_rules! impl_batch_call {
49    ( $self:expr, $data:expr, $call:ident ) => {{
50        impl_batch_call!($self, $data, $call, )
51    }};
52
53    ( $self:expr, $data:expr, $call:ident, apply_deref ) => {{
54        impl_batch_call!($self, $data, $call, *)
55    }};
56
57    ( $self:expr, $data:expr, $call:ident, $($apply_deref:tt)? ) => {{
58        let mut batch = Batch::default();
59        for i in $data {
60            batch.$call($($apply_deref)* i.borrow());
61        }
62
63        let resp = $self.batch_call(&batch)?;
64        let mut answer = Vec::new();
65
66        for x in resp {
67            answer.push(serde_json::from_value(x)?);
68        }
69
70        Ok(answer)
71    }};
72}
73
74/// A trait for [`ToSocketAddrs`](https://doc.rust-lang.org/std/net/trait.ToSocketAddrs.html) that
75/// can also be turned into a domain. Used when an SSL client needs to validate the server's
76/// certificate.
77pub trait ToSocketAddrsDomain: ToSocketAddrs {
78    /// Returns the domain, if present
79    fn domain(&self) -> Option<&str> {
80        None
81    }
82}
83
84impl ToSocketAddrsDomain for &str {
85    fn domain(&self) -> Option<&str> {
86        self.split(':').next()
87    }
88}
89
90impl ToSocketAddrsDomain for (&str, u16) {
91    fn domain(&self) -> Option<&str> {
92        self.0.domain()
93    }
94}
95
96#[cfg(any(feature = "default", feature = "proxy"))]
97impl ToSocketAddrsDomain for TargetAddr {
98    fn domain(&self) -> Option<&str> {
99        match self {
100            TargetAddr::Ip(_) => None,
101            TargetAddr::Domain(domain, _) => Some(domain.as_str()),
102        }
103    }
104}
105
106macro_rules! impl_to_socket_addrs_domain {
107    ( $ty:ty ) => {
108        impl ToSocketAddrsDomain for $ty {}
109    };
110}
111
112impl_to_socket_addrs_domain!(std::net::SocketAddr);
113impl_to_socket_addrs_domain!(std::net::SocketAddrV4);
114impl_to_socket_addrs_domain!(std::net::SocketAddrV6);
115impl_to_socket_addrs_domain!((std::net::IpAddr, u16));
116impl_to_socket_addrs_domain!((std::net::Ipv4Addr, u16));
117impl_to_socket_addrs_domain!((std::net::Ipv6Addr, u16));
118
119/// Instance of an Electrum client
120///
121/// A `Client` maintains a constant connection with an Electrum server and exposes methods to
122/// interact with it. It can also subscribe and receive notifictations from the server about new
123/// blocks or activity on a specific *scriptPubKey*.
124///
125/// The `Client` is modeled in such a way that allows the external caller to have full control over
126/// its functionality: no threads or tasks are spawned internally to monitor the state of the
127/// connection.
128///
129/// More transport methods can be used by manually creating an instance of this struct with an
130/// arbitray `S` type.
131#[derive(Debug)]
132pub struct RawClient<S>
133where
134    S: Read + Write,
135{
136    stream: Mutex<ClonableStream<S>>,
137    buf_reader: Mutex<BufReader<ClonableStream<S>>>,
138
139    last_id: AtomicUsize,
140    waiting_map: Mutex<HashMap<usize, Sender<ChannelMessage>>>,
141
142    headers: Mutex<VecDeque<RawHeaderNotification>>,
143    script_notifications: Mutex<HashMap<ScriptHash, VecDeque<ScriptStatus>>>,
144
145    #[cfg(feature = "debug-calls")]
146    calls: AtomicUsize,
147}
148
149impl<S> From<S> for RawClient<S>
150where
151    S: Read + Write,
152{
153    fn from(stream: S) -> Self {
154        let stream: ClonableStream<_> = stream.into();
155
156        Self {
157            buf_reader: Mutex::new(BufReader::new(stream.clone())),
158            stream: Mutex::new(stream),
159
160            last_id: AtomicUsize::new(0),
161            waiting_map: Mutex::new(HashMap::new()),
162
163            headers: Mutex::new(VecDeque::new()),
164            script_notifications: Mutex::new(HashMap::new()),
165
166            #[cfg(feature = "debug-calls")]
167            calls: AtomicUsize::new(0),
168        }
169    }
170}
171
172/// Transport type used to establish a plaintext TCP connection with the server
173pub type ElectrumPlaintextStream = TcpStream;
174impl RawClient<ElectrumPlaintextStream> {
175    /// Creates a new plaintext client and tries to connect to `socket_addr`.
176    pub fn new<A: ToSocketAddrs>(
177        socket_addrs: A,
178        timeout: Option<Duration>,
179    ) -> Result<Self, Error> {
180        let stream = match timeout {
181            Some(timeout) => {
182                let stream = connect_with_total_timeout(socket_addrs, timeout)?;
183                stream.set_read_timeout(Some(timeout))?;
184                stream.set_write_timeout(Some(timeout))?;
185                stream
186            }
187            None => TcpStream::connect(socket_addrs)?,
188        };
189
190        Ok(stream.into())
191    }
192}
193
194fn connect_with_total_timeout<A: ToSocketAddrs>(
195    socket_addrs: A,
196    mut timeout: Duration,
197) -> Result<TcpStream, Error> {
198    // Use the same algorithm as curl: 1/2 on the first host, 1/4 on the second one, etc.
199    // https://curl.se/mail/lib-2014-11/0164.html
200
201    let mut errors = Vec::new();
202
203    let addrs = socket_addrs
204        .to_socket_addrs()?
205        .enumerate()
206        .collect::<Vec<_>>();
207    for (index, addr) in &addrs {
208        if *index < addrs.len() - 1 {
209            timeout = timeout.div_f32(2.0);
210        }
211
212        info!(
213            "Trying to connect to {} (attempt {}/{}) with timeout {:?}",
214            addr,
215            index + 1,
216            addrs.len(),
217            timeout
218        );
219        match TcpStream::connect_timeout(addr, timeout) {
220            Ok(socket) => return Ok(socket),
221            Err(e) => {
222                warn!("Connection error: {:?}", e);
223                errors.push(e.into());
224            }
225        }
226    }
227
228    Err(Error::AllAttemptsErrored(errors))
229}
230
231#[cfg(feature = "use-openssl")]
232/// Transport type used to establish an OpenSSL TLS encrypted/authenticated connection with the server
233pub type ElectrumSslStream = SslStream<TcpStream>;
234#[cfg(feature = "use-openssl")]
235impl RawClient<ElectrumSslStream> {
236    /// Creates a new SSL client and tries to connect to `socket_addr`. Optionally, if
237    /// `validate_domain` is `true`, validate the server's certificate.
238    pub fn new_ssl<A: ToSocketAddrsDomain + Clone>(
239        socket_addrs: A,
240        validate_domain: bool,
241        timeout: Option<Duration>,
242    ) -> Result<Self, Error> {
243        debug!(
244            "new_ssl socket_addrs.domain():{:?} validate_domain:{} timeout:{:?}",
245            socket_addrs.domain(),
246            validate_domain,
247            timeout
248        );
249        if validate_domain {
250            socket_addrs.domain().ok_or(Error::MissingDomain)?;
251        }
252        match timeout {
253            Some(timeout) => {
254                let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
255                stream.set_read_timeout(Some(timeout))?;
256                stream.set_write_timeout(Some(timeout))?;
257                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
258            }
259            None => {
260                let stream = TcpStream::connect(socket_addrs.clone())?;
261                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
262            }
263        }
264    }
265
266    /// Create a new SSL client using an existing TcpStream
267    pub fn new_ssl_from_stream<A: ToSocketAddrsDomain>(
268        socket_addrs: A,
269        validate_domain: bool,
270        stream: TcpStream,
271    ) -> Result<Self, Error> {
272        let mut builder =
273            SslConnector::builder(SslMethod::tls()).map_err(Error::InvalidSslMethod)?;
274        // TODO: support for certificate pinning
275        if validate_domain {
276            socket_addrs.domain().ok_or(Error::MissingDomain)?;
277        } else {
278            builder.set_verify(SslVerifyMode::NONE);
279        }
280        let connector = builder.build();
281
282        let domain = socket_addrs.domain().unwrap_or("NONE").to_string();
283
284        let stream = connector
285            .connect(&domain, stream)
286            .map_err(Error::SslHandshakeError)?;
287
288        Ok(stream.into())
289    }
290}
291
292#[cfg(all(
293    any(
294        feature = "default",
295        feature = "use-rustls",
296        feature = "use-rustls-ring"
297    ),
298    not(feature = "use-openssl")
299))]
300mod danger {
301    use crate::raw_client::ServerName;
302    use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified};
303    use rustls::crypto::CryptoProvider;
304    use rustls::pki_types::{CertificateDer, UnixTime};
305    use rustls::DigitallySignedStruct;
306
307    #[derive(Debug)]
308    pub struct NoCertificateVerification(CryptoProvider);
309
310    impl NoCertificateVerification {
311        pub fn new(provider: CryptoProvider) -> Self {
312            Self(provider)
313        }
314    }
315
316    impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
317        fn verify_server_cert(
318            &self,
319            _end_entity: &CertificateDer<'_>,
320            _intermediates: &[CertificateDer<'_>],
321            _server_name: &ServerName<'_>,
322            _ocsp: &[u8],
323            _now: UnixTime,
324        ) -> Result<ServerCertVerified, rustls::Error> {
325            Ok(ServerCertVerified::assertion())
326        }
327
328        fn verify_tls12_signature(
329            &self,
330            _message: &[u8],
331            _cert: &CertificateDer<'_>,
332            _dss: &DigitallySignedStruct,
333        ) -> Result<HandshakeSignatureValid, rustls::Error> {
334            Ok(HandshakeSignatureValid::assertion())
335        }
336
337        fn verify_tls13_signature(
338            &self,
339            _message: &[u8],
340            _cert: &CertificateDer<'_>,
341            _dss: &DigitallySignedStruct,
342        ) -> Result<HandshakeSignatureValid, rustls::Error> {
343            Ok(HandshakeSignatureValid::assertion())
344        }
345
346        fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
347            self.0.signature_verification_algorithms.supported_schemes()
348        }
349    }
350}
351
352#[cfg(all(
353    any(
354        feature = "default",
355        feature = "use-rustls",
356        feature = "use-rustls-ring"
357    ),
358    not(feature = "use-openssl")
359))]
360/// Transport type used to establish a Rustls TLS encrypted/authenticated connection with the server
361pub type ElectrumSslStream = StreamOwned<ClientConnection, TcpStream>;
362#[cfg(all(
363    any(
364        feature = "default",
365        feature = "use-rustls",
366        feature = "use-rustls-ring"
367    ),
368    not(feature = "use-openssl")
369))]
370impl RawClient<ElectrumSslStream> {
371    /// Creates a new SSL client and tries to connect to `socket_addr`. Optionally, if
372    /// `validate_domain` is `true`, validate the server's certificate.
373    pub fn new_ssl<A: ToSocketAddrsDomain + Clone>(
374        socket_addrs: A,
375        validate_domain: bool,
376        timeout: Option<Duration>,
377    ) -> Result<Self, Error> {
378        debug!(
379            "new_ssl socket_addrs.domain():{:?} validate_domain:{} timeout:{:?}",
380            socket_addrs.domain(),
381            validate_domain,
382            timeout
383        );
384        if validate_domain {
385            socket_addrs.domain().ok_or(Error::MissingDomain)?;
386        }
387        match timeout {
388            Some(timeout) => {
389                let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
390                stream.set_read_timeout(Some(timeout))?;
391                stream.set_write_timeout(Some(timeout))?;
392                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
393            }
394            None => {
395                let stream = TcpStream::connect(socket_addrs.clone())?;
396                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
397            }
398        }
399    }
400
401    /// Create a new SSL client using an existing TcpStream
402    pub fn new_ssl_from_stream<A: ToSocketAddrsDomain>(
403        socket_addr: A,
404        validate_domain: bool,
405        tcp_stream: TcpStream,
406    ) -> Result<Self, Error> {
407        use std::convert::TryFrom;
408
409        let builder = ClientConfig::builder();
410
411        let config = if validate_domain {
412            socket_addr.domain().ok_or(Error::MissingDomain)?;
413
414            let store = webpki_roots::TLS_SERVER_ROOTS
415                .iter()
416                .map(|t| TrustAnchor {
417                    subject: Der::from_slice(t.subject),
418                    subject_public_key_info: Der::from_slice(t.spki),
419                    name_constraints: t.name_constraints.map(Der::from_slice),
420                })
421                .collect::<RootCertStore>();
422
423            // TODO: cert pinning
424            builder.with_root_certificates(store).with_no_client_auth()
425        } else {
426            builder
427                .dangerous()
428                .with_custom_certificate_verifier(std::sync::Arc::new(
429                    #[cfg(feature = "use-rustls")]
430                    danger::NoCertificateVerification::new(rustls::crypto::aws_lc_rs::default_provider()),
431                    #[cfg(feature = "use-rustls-ring")]
432                    danger::NoCertificateVerification::new(rustls::crypto::ring::default_provider()),
433                ))
434                .with_no_client_auth()
435        };
436
437        let domain = socket_addr.domain().unwrap_or("NONE").to_string();
438        let session = ClientConnection::new(
439            std::sync::Arc::new(config),
440            ServerName::try_from(domain.clone())
441                .map_err(|_| Error::InvalidDNSNameError(domain.clone()))?,
442        )
443        .map_err(Error::CouldNotCreateConnection)?;
444        let stream = StreamOwned::new(session, tcp_stream);
445
446        Ok(stream.into())
447    }
448}
449
450#[cfg(any(feature = "default", feature = "proxy"))]
451/// Transport type used to establish a connection to a server through a socks proxy
452pub type ElectrumProxyStream = Socks5Stream;
453#[cfg(any(feature = "default", feature = "proxy"))]
454impl RawClient<ElectrumProxyStream> {
455    /// Creates a new socks client and tries to connect to `target_addr` using `proxy_addr` as a
456    /// socks proxy server. The DNS resolution of `target_addr`, if required, is done
457    /// through the proxy. This allows to specify, for instance, `.onion` addresses.
458    pub fn new_proxy<T: ToTargetAddr>(
459        target_addr: T,
460        proxy: &crate::Socks5Config,
461        timeout: Option<Duration>,
462    ) -> Result<Self, Error> {
463        let mut stream = match proxy.credentials.as_ref() {
464            Some(cred) => Socks5Stream::connect_with_password(
465                &proxy.addr,
466                target_addr,
467                &cred.username,
468                &cred.password,
469                timeout,
470            )?,
471            None => Socks5Stream::connect(&proxy.addr, target_addr, timeout)?,
472        };
473        stream.get_mut().set_read_timeout(timeout)?;
474        stream.get_mut().set_write_timeout(timeout)?;
475
476        Ok(stream.into())
477    }
478
479    #[cfg(any(
480        feature = "use-openssl",
481        feature = "use-rustls",
482        feature = "use-rustls-ring"
483    ))]
484    /// Creates a new TLS client that connects to `target_addr` using `proxy_addr` as a socks proxy
485    /// server. The DNS resolution of `target_addr`, if required, is done through the proxy. This
486    /// allows to specify, for instance, `.onion` addresses.
487    pub fn new_proxy_ssl<T: ToTargetAddr>(
488        target_addr: T,
489        validate_domain: bool,
490        proxy: &crate::Socks5Config,
491        timeout: Option<Duration>,
492    ) -> Result<RawClient<ElectrumSslStream>, Error> {
493        let target = target_addr.to_target_addr()?;
494
495        let mut stream = match proxy.credentials.as_ref() {
496            Some(cred) => Socks5Stream::connect_with_password(
497                &proxy.addr,
498                target_addr,
499                &cred.username,
500                &cred.password,
501                timeout,
502            )?,
503            None => Socks5Stream::connect(&proxy.addr, target.clone(), timeout)?,
504        };
505        stream.get_mut().set_read_timeout(timeout)?;
506        stream.get_mut().set_write_timeout(timeout)?;
507
508        RawClient::new_ssl_from_stream(target, validate_domain, stream.into_inner())
509    }
510}
511
512#[derive(Debug)]
513enum ChannelMessage {
514    Response(serde_json::Value),
515    WakeUp,
516    Error(Arc<std::io::Error>),
517}
518
519impl<S: Read + Write> RawClient<S> {
520    // TODO: to enable this we have to find a way to allow concurrent read and writes to the
521    // underlying transport struct. This can be done pretty easily for TcpStream because it can be
522    // split into a "read" and a "write" object, but it's not as trivial for other types. Without
523    // such thing, this causes a deadlock, because the reader thread takes a lock on the
524    // `ClonableStream` before other threads can send a request to the server. They will block
525    // waiting for the reader to release the mutex, but this will never happen because the server
526    // didn't receive any request, so it has nothing to send back.
527    // pub fn reader_thread(&self) -> Result<(), Error> {
528    //     self._reader_thread(None).map(|_| ())
529    // }
530
531    fn _reader_thread(&self, until_message: Option<usize>) -> Result<serde_json::Value, Error> {
532        let mut raw_resp = String::new();
533        let resp = match self.buf_reader.try_lock() {
534            Ok(mut reader) => {
535                trace!(
536                    "Starting reader thread with `until_message` = {:?}",
537                    until_message
538                );
539
540                if let Some(until_message) = until_message {
541                    // If we are trying to start a reader thread but the corresponding sender is
542                    // missing from the map, exit immediately. This can happen with batch calls,
543                    // since the sender is shared for all the individual queries in a call. We
544                    // might have already received a response for that id, but we don't know it
545                    // yet. Exiting here forces the calling code to fallback to the sender-receiver
546                    // method, and it should find a message there waiting for it.
547                    if self.waiting_map.lock()?.get(&until_message).is_none() {
548                        return Err(Error::CouldntLockReader);
549                    }
550                }
551
552                // Loop over every message
553                loop {
554                    raw_resp.clear();
555
556                    if let Err(e) = reader.read_line(&mut raw_resp) {
557                        let error = Arc::new(e);
558                        for (_, s) in self.waiting_map.lock().unwrap().drain() {
559                            s.send(ChannelMessage::Error(error.clone()))?;
560                        }
561                        return Err(Error::SharedIOError(error));
562                    }
563                    trace!("<== {}", raw_resp);
564
565                    let resp: serde_json::Value = serde_json::from_str(&raw_resp)?;
566
567                    // Normally there is and id, but it's missing for spontaneous notifications
568                    // from the server
569                    let resp_id = resp["id"]
570                        .as_str()
571                        .and_then(|s| s.parse().ok())
572                        .or_else(|| resp["id"].as_u64().map(|i| i as usize));
573                    match resp_id {
574                        Some(resp_id) if until_message == Some(resp_id) => {
575                            // We have a valid id and it's exactly the one we were waiting for!
576                            trace!(
577                                "Reader thread {} received a response for its request",
578                                resp_id
579                            );
580
581                            // Remove ourselves from the "waiting map"
582                            let mut map = self.waiting_map.lock()?;
583                            map.remove(&resp_id);
584
585                            // If the map is not empty, we select a random thread to become the
586                            // new reader thread.
587                            if let Some(err) = map.values().find_map(|sender| {
588                                sender
589                                    .send(ChannelMessage::WakeUp)
590                                    .map_err(|err| {
591                                        warn!("Unable to wake up a thread, trying some other");
592                                        err
593                                    })
594                                    .err()
595                            }) {
596                                error!("All the threads has failed, giving up");
597                                return Err(err)?;
598                            }
599
600                            break Ok(resp);
601                        }
602                        Some(resp_id) => {
603                            // We have an id, but it's not our response. Notify the thread and
604                            // move on
605                            trace!("Reader thread received response for {}", resp_id);
606
607                            if let Some(sender) = self.waiting_map.lock()?.remove(&resp_id) {
608                                sender.send(ChannelMessage::Response(resp))?;
609                            } else {
610                                warn!("Missing listener for {}", resp_id);
611                            }
612                        }
613                        None => {
614                            // No id, that's probably a notification.
615                            let mut resp = resp;
616
617                            if let Some(method) = resp["method"].take().as_str() {
618                                self.handle_notification(method, resp["params"].take())?;
619                            } else {
620                                warn!("Unexpected response: {:?}", resp);
621                            }
622                        }
623                    }
624                }
625            }
626            Err(TryLockError::WouldBlock) => {
627                // If we "WouldBlock" here it means that there's already a reader thread
628                // running somewhere.
629                Err(Error::CouldntLockReader)
630            }
631            Err(TryLockError::Poisoned(e)) => Err(e)?,
632        };
633
634        let resp = resp?;
635        if let Some(err) = resp.get("error") {
636            Err(Error::Protocol(err.clone()))
637        } else {
638            Ok(resp)
639        }
640    }
641
642    fn call(&self, req: Request) -> Result<serde_json::Value, Error> {
643        // Add our listener to the map before we send the request, to make sure we don't get a
644        // reply before the receiver is added
645        let (sender, receiver) = channel();
646        self.waiting_map.lock()?.insert(req.id, sender);
647
648        let mut raw = serde_json::to_vec(&req)?;
649        trace!("==> {}", String::from_utf8_lossy(&raw));
650
651        raw.extend_from_slice(b"\n");
652        let mut stream = self.stream.lock()?;
653        stream.write_all(&raw)?;
654        stream.flush()?;
655        drop(stream); // release the lock
656
657        self.increment_calls();
658
659        let mut resp = match self.recv(&receiver, req.id) {
660            Ok(resp) => resp,
661            e @ Err(_) => {
662                // In case of error our sender could still be left in the map, depending on where
663                // the error happened. Just in case, try to remove it here
664                self.waiting_map.lock()?.remove(&req.id);
665                return e;
666            }
667        };
668        Ok(resp["result"].take())
669    }
670
671    fn recv(
672        &self,
673        receiver: &Receiver<ChannelMessage>,
674        req_id: usize,
675    ) -> Result<serde_json::Value, Error> {
676        loop {
677            // Try to take the lock on the reader. If we manage to do so, we'll become the reader
678            // thread until we get our reponse
679            match self._reader_thread(Some(req_id)) {
680                Ok(response) => break Ok(response),
681                Err(Error::CouldntLockReader) => {
682                    match receiver.recv()? {
683                        // Received our response, returning it
684                        ChannelMessage::Response(received) => break Ok(received),
685                        ChannelMessage::WakeUp => {
686                            // We have been woken up, this means that we should try becoming the
687                            // reader thread ourselves
688                            trace!("WakeUp for {}", req_id);
689
690                            continue;
691                        }
692                        ChannelMessage::Error(e) => {
693                            warn!("Received ChannelMessage::Error");
694
695                            break Err(Error::SharedIOError(e));
696                        }
697                    }
698                }
699                e @ Err(_) => break e,
700            }
701        }
702    }
703
704    fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> {
705        match method {
706            "blockchain.headers.subscribe" => self.headers.lock()?.append(
707                &mut serde_json::from_value::<Vec<RawHeaderNotification>>(result)?
708                    .into_iter()
709                    .collect(),
710            ),
711            "blockchain.scripthash.subscribe" => {
712                let unserialized: ScriptNotification = serde_json::from_value(result)?;
713                let mut script_notifications = self.script_notifications.lock()?;
714
715                let queue = script_notifications
716                    .get_mut(&unserialized.scripthash)
717                    .ok_or(Error::NotSubscribed(unserialized.scripthash))?;
718
719                queue.push_back(unserialized.status);
720            }
721            _ => info!("received unknown notification for method `{}`", method),
722        }
723
724        Ok(())
725    }
726
727    pub(crate) fn internal_raw_call_with_vec(
728        &self,
729        method_name: &str,
730        params: Vec<Param>,
731    ) -> Result<serde_json::Value, Error> {
732        let req = Request::new_id(
733            self.last_id.fetch_add(1, Ordering::SeqCst),
734            method_name,
735            params,
736        );
737        let result = self.call(req)?;
738
739        Ok(result)
740    }
741
742    #[inline]
743    #[cfg(feature = "debug-calls")]
744    fn increment_calls(&self) {
745        self.calls.fetch_add(1, Ordering::SeqCst);
746    }
747
748    #[inline]
749    #[cfg(not(feature = "debug-calls"))]
750    fn increment_calls(&self) {}
751}
752
753impl<T: Read + Write> ElectrumApi for RawClient<T> {
754    fn raw_call(
755        &self,
756        method_name: &str,
757        params: impl IntoIterator<Item = Param>,
758    ) -> Result<serde_json::Value, Error> {
759        self.internal_raw_call_with_vec(method_name, params.into_iter().collect())
760    }
761
762    fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
763        let mut raw = Vec::new();
764
765        let mut missing_responses = BTreeSet::new();
766        let mut answers = BTreeMap::new();
767
768        // Add our listener to the map before we send the request, Here we will clone the sender
769        // for every request id, so that we only have to monitor one receiver.
770        let (sender, receiver) = channel();
771
772        for (method, params) in batch.iter() {
773            let req = Request::new_id(
774                self.last_id.fetch_add(1, Ordering::SeqCst),
775                method,
776                params.to_vec(),
777            );
778            missing_responses.insert(req.id);
779
780            self.waiting_map.lock()?.insert(req.id, sender.clone());
781
782            raw.append(&mut serde_json::to_vec(&req)?);
783            raw.extend_from_slice(b"\n");
784        }
785
786        if missing_responses.is_empty() {
787            return Ok(vec![]);
788        }
789
790        trace!("==> {}", String::from_utf8_lossy(&raw));
791
792        let mut stream = self.stream.lock()?;
793        stream.write_all(&raw)?;
794        stream.flush()?;
795        drop(stream); // release the lock
796
797        self.increment_calls();
798
799        for req_id in missing_responses.iter() {
800            match self.recv(&receiver, *req_id) {
801                Ok(mut resp) => answers.insert(req_id, resp["result"].take()),
802                Err(e) => {
803                    // In case of error our sender could still be left in the map, depending on where
804                    // the error happened. Just in case, try to remove it here
805                    warn!("got error for req_id {}: {:?}", req_id, e);
806                    warn!("removing all waiting req of this batch");
807                    let mut guard = self.waiting_map.lock()?;
808                    for req_id in missing_responses.iter() {
809                        guard.remove(req_id);
810                    }
811                    return Err(e);
812                }
813            };
814        }
815
816        Ok(answers.into_values().collect())
817    }
818
819    fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
820        let req = Request::new_id(
821            self.last_id.fetch_add(1, Ordering::SeqCst),
822            "blockchain.headers.subscribe",
823            vec![],
824        );
825        let value = self.call(req)?;
826
827        Ok(serde_json::from_value(value)?)
828    }
829
830    fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
831        Ok(self.headers.lock()?.pop_front())
832    }
833
834    fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
835        let req = Request::new_id(
836            self.last_id.fetch_add(1, Ordering::SeqCst),
837            "blockchain.block.header",
838            vec![Param::Usize(height)],
839        );
840        let result = self.call(req)?;
841
842        Ok(Vec::<u8>::from_hex(
843            result
844                .as_str()
845                .ok_or_else(|| Error::InvalidResponse(result.clone()))?,
846        )?)
847    }
848
849    fn block_headers(&self, start_height: usize, count: usize) -> Result<GetHeadersRes, Error> {
850        let req = Request::new_id(
851            self.last_id.fetch_add(1, Ordering::SeqCst),
852            "blockchain.block.headers",
853            vec![Param::Usize(start_height), Param::Usize(count)],
854        );
855        let result = self.call(req)?;
856
857        let mut deserialized: GetHeadersRes = serde_json::from_value(result)?;
858        for i in 0..deserialized.count {
859            let (start, end) = (i * 80, (i + 1) * 80);
860            deserialized
861                .headers
862                .push(deserialize(&deserialized.raw_headers[start..end])?);
863        }
864        deserialized.raw_headers.clear();
865
866        Ok(deserialized)
867    }
868
869    fn estimate_fee(&self, number: usize) -> Result<f64, Error> {
870        let req = Request::new_id(
871            self.last_id.fetch_add(1, Ordering::SeqCst),
872            "blockchain.estimatefee",
873            vec![Param::Usize(number)],
874        );
875        let result = self.call(req)?;
876
877        result
878            .as_f64()
879            .ok_or_else(|| Error::InvalidResponse(result.clone()))
880    }
881
882    fn relay_fee(&self) -> Result<f64, Error> {
883        let req = Request::new_id(
884            self.last_id.fetch_add(1, Ordering::SeqCst),
885            "blockchain.relayfee",
886            vec![],
887        );
888        let result = self.call(req)?;
889
890        result
891            .as_f64()
892            .ok_or_else(|| Error::InvalidResponse(result.clone()))
893    }
894
895    fn script_subscribe(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
896        let script_hash = script.to_electrum_scripthash();
897        let mut script_notifications = self.script_notifications.lock()?;
898
899        if script_notifications.contains_key(&script_hash) {
900            return Err(Error::AlreadySubscribed(script_hash));
901        }
902
903        script_notifications.insert(script_hash, VecDeque::new());
904        drop(script_notifications);
905
906        let req = Request::new_id(
907            self.last_id.fetch_add(1, Ordering::SeqCst),
908            "blockchain.scripthash.subscribe",
909            vec![Param::String(script_hash.to_hex())],
910        );
911        let value = self.call(req)?;
912
913        Ok(serde_json::from_value(value)?)
914    }
915
916    fn batch_script_subscribe<'s, I>(&self, scripts: I) -> Result<Vec<Option<ScriptStatus>>, Error>
917    where
918        I: IntoIterator + Clone,
919        I::Item: Borrow<&'s Script>,
920    {
921        {
922            let mut script_notifications = self.script_notifications.lock()?;
923
924            for script in scripts.clone() {
925                let script_hash = script.borrow().to_electrum_scripthash();
926                if script_notifications.contains_key(&script_hash) {
927                    return Err(Error::AlreadySubscribed(script_hash));
928                }
929                script_notifications.insert(script_hash, VecDeque::new());
930            }
931        }
932        impl_batch_call!(self, scripts, script_subscribe)
933    }
934
935    fn script_unsubscribe(&self, script: &Script) -> Result<bool, Error> {
936        let script_hash = script.to_electrum_scripthash();
937        let mut script_notifications = self.script_notifications.lock()?;
938
939        if !script_notifications.contains_key(&script_hash) {
940            return Err(Error::NotSubscribed(script_hash));
941        }
942
943        let req = Request::new_id(
944            self.last_id.fetch_add(1, Ordering::SeqCst),
945            "blockchain.scripthash.unsubscribe",
946            vec![Param::String(script_hash.to_hex())],
947        );
948        let value = self.call(req)?;
949        let answer = serde_json::from_value(value)?;
950
951        script_notifications.remove(&script_hash);
952
953        Ok(answer)
954    }
955
956    fn script_pop(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
957        let script_hash = script.to_electrum_scripthash();
958
959        match self.script_notifications.lock()?.get_mut(&script_hash) {
960            None => Err(Error::NotSubscribed(script_hash)),
961            Some(queue) => Ok(queue.pop_front()),
962        }
963    }
964
965    fn script_get_balance(&self, script: &Script) -> Result<GetBalanceRes, Error> {
966        let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
967        let req = Request::new_id(
968            self.last_id.fetch_add(1, Ordering::SeqCst),
969            "blockchain.scripthash.get_balance",
970            params,
971        );
972        let result = self.call(req)?;
973
974        Ok(serde_json::from_value(result)?)
975    }
976    fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result<Vec<GetBalanceRes>, Error>
977    where
978        I: IntoIterator + Clone,
979        I::Item: Borrow<&'s Script>,
980    {
981        impl_batch_call!(self, scripts, script_get_balance)
982    }
983
984    fn script_get_history(&self, script: &Script) -> Result<Vec<GetHistoryRes>, Error> {
985        let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
986        let req = Request::new_id(
987            self.last_id.fetch_add(1, Ordering::SeqCst),
988            "blockchain.scripthash.get_history",
989            params,
990        );
991        let result = self.call(req)?;
992
993        Ok(serde_json::from_value(result)?)
994    }
995    fn batch_script_get_history<'s, I>(&self, scripts: I) -> Result<Vec<Vec<GetHistoryRes>>, Error>
996    where
997        I: IntoIterator + Clone,
998        I::Item: Borrow<&'s Script>,
999    {
1000        impl_batch_call!(self, scripts, script_get_history)
1001    }
1002
1003    fn script_list_unspent(&self, script: &Script) -> Result<Vec<ListUnspentRes>, Error> {
1004        let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
1005        let req = Request::new_id(
1006            self.last_id.fetch_add(1, Ordering::SeqCst),
1007            "blockchain.scripthash.listunspent",
1008            params,
1009        );
1010        let result = self.call(req)?;
1011        let mut result: Vec<ListUnspentRes> = serde_json::from_value(result)?;
1012
1013        // This should not be necessary, since the protocol documentation says that the txs should
1014        // be "in blockchain order" (https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-listunspent).
1015        // However, elects seems to be ignoring this at the moment, so we'll sort again here just
1016        // to make sure the result is consistent.
1017        result.sort_unstable_by_key(|k| (k.height, k.tx_pos));
1018        Ok(result)
1019    }
1020
1021    fn batch_script_list_unspent<'s, I>(
1022        &self,
1023        scripts: I,
1024    ) -> Result<Vec<Vec<ListUnspentRes>>, Error>
1025    where
1026        I: IntoIterator + Clone,
1027        I::Item: Borrow<&'s Script>,
1028    {
1029        impl_batch_call!(self, scripts, script_list_unspent)
1030    }
1031
1032    fn transaction_get_raw(&self, txid: &Txid) -> Result<Vec<u8>, Error> {
1033        let params = vec![Param::String(format!("{:x}", txid))];
1034        let req = Request::new_id(
1035            self.last_id.fetch_add(1, Ordering::SeqCst),
1036            "blockchain.transaction.get",
1037            params,
1038        );
1039        let result = self.call(req)?;
1040
1041        Ok(Vec::<u8>::from_hex(
1042            result
1043                .as_str()
1044                .ok_or_else(|| Error::InvalidResponse(result.clone()))?,
1045        )?)
1046    }
1047
1048    fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result<Vec<Vec<u8>>, Error>
1049    where
1050        I: IntoIterator + Clone,
1051        I::Item: Borrow<&'t Txid>,
1052    {
1053        let txs_string: Result<Vec<String>, Error> = impl_batch_call!(self, txids, transaction_get);
1054        txs_string?
1055            .iter()
1056            .map(|s| Ok(Vec::<u8>::from_hex(s)?))
1057            .collect()
1058    }
1059
1060    fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result<Vec<Vec<u8>>, Error>
1061    where
1062        I: IntoIterator + Clone,
1063        I::Item: Borrow<u32>,
1064    {
1065        let headers_string: Result<Vec<String>, Error> =
1066            impl_batch_call!(self, heights, block_header, apply_deref);
1067        headers_string?
1068            .iter()
1069            .map(|s| Ok(Vec::<u8>::from_hex(s)?))
1070            .collect()
1071    }
1072
1073    fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result<Vec<f64>, Error>
1074    where
1075        I: IntoIterator + Clone,
1076        I::Item: Borrow<usize>,
1077    {
1078        impl_batch_call!(self, numbers, estimate_fee, apply_deref)
1079    }
1080
1081    fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result<Txid, Error> {
1082        let params = vec![Param::String(raw_tx.to_lower_hex_string())];
1083        let req = Request::new_id(
1084            self.last_id.fetch_add(1, Ordering::SeqCst),
1085            "blockchain.transaction.broadcast",
1086            params,
1087        );
1088        let result = self.call(req)?;
1089
1090        Ok(serde_json::from_value(result)?)
1091    }
1092
1093    fn transaction_get_merkle(&self, txid: &Txid, height: usize) -> Result<GetMerkleRes, Error> {
1094        let params = vec![Param::String(format!("{:x}", txid)), Param::Usize(height)];
1095        let req = Request::new_id(
1096            self.last_id.fetch_add(1, Ordering::SeqCst),
1097            "blockchain.transaction.get_merkle",
1098            params,
1099        );
1100        let result = self.call(req)?;
1101
1102        Ok(serde_json::from_value(result)?)
1103    }
1104
1105    fn txid_from_pos(&self, height: usize, tx_pos: usize) -> Result<Txid, Error> {
1106        let params = vec![Param::Usize(height), Param::Usize(tx_pos)];
1107        let req = Request::new_id(
1108            self.last_id.fetch_add(1, Ordering::SeqCst),
1109            "blockchain.transaction.id_from_pos",
1110            params,
1111        );
1112        let result = self.call(req)?;
1113
1114        Ok(serde_json::from_value(result)?)
1115    }
1116
1117    fn txid_from_pos_with_merkle(
1118        &self,
1119        height: usize,
1120        tx_pos: usize,
1121    ) -> Result<TxidFromPosRes, Error> {
1122        let params = vec![
1123            Param::Usize(height),
1124            Param::Usize(tx_pos),
1125            Param::Bool(true),
1126        ];
1127        let req = Request::new_id(
1128            self.last_id.fetch_add(1, Ordering::SeqCst),
1129            "blockchain.transaction.id_from_pos",
1130            params,
1131        );
1132        let result = self.call(req)?;
1133
1134        Ok(serde_json::from_value(result)?)
1135    }
1136
1137    fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
1138        let req = Request::new_id(
1139            self.last_id.fetch_add(1, Ordering::SeqCst),
1140            "server.features",
1141            vec![],
1142        );
1143        let result = self.call(req)?;
1144
1145        Ok(serde_json::from_value(result)?)
1146    }
1147
1148    fn ping(&self) -> Result<(), Error> {
1149        let req = Request::new_id(
1150            self.last_id.fetch_add(1, Ordering::SeqCst),
1151            "server.ping",
1152            vec![],
1153        );
1154        self.call(req)?;
1155
1156        Ok(())
1157    }
1158
1159    #[cfg(feature = "debug-calls")]
1160    fn calls_made(&self) -> Result<usize, Error> {
1161        Ok(self.calls.load(Ordering::SeqCst))
1162    }
1163}
1164
1165#[cfg(test)]
1166mod test {
1167    use std::str::FromStr;
1168
1169    use crate::utils;
1170
1171    use super::RawClient;
1172    use crate::api::ElectrumApi;
1173
1174    fn get_test_server() -> String {
1175        std::env::var("TEST_ELECTRUM_SERVER").unwrap_or("electrum.blockstream.info:50001".into())
1176    }
1177
1178    #[test]
1179    fn test_server_features_simple() {
1180        let client = RawClient::new(get_test_server(), None).unwrap();
1181
1182        let resp = client.server_features().unwrap();
1183        assert_eq!(
1184            resp.genesis_hash,
1185            [
1186                0, 0, 0, 0, 0, 25, 214, 104, 156, 8, 90, 225, 101, 131, 30, 147, 79, 247, 99, 174,
1187                70, 162, 166, 193, 114, 179, 241, 182, 10, 140, 226, 111
1188            ],
1189        );
1190        assert_eq!(resp.hash_function, Some("sha256".into()));
1191        assert_eq!(resp.pruning, None);
1192    }
1193    #[test]
1194    fn test_relay_fee() {
1195        let client = RawClient::new(get_test_server(), None).unwrap();
1196
1197        let resp = client.relay_fee().unwrap();
1198        assert_eq!(resp, 0.00001);
1199    }
1200
1201    #[test]
1202    fn test_estimate_fee() {
1203        let client = RawClient::new(get_test_server(), None).unwrap();
1204
1205        let resp = client.estimate_fee(10).unwrap();
1206        assert!(resp > 0.0);
1207    }
1208
1209    #[test]
1210    fn test_block_header() {
1211        let client = RawClient::new(get_test_server(), None).unwrap();
1212
1213        let resp = client.block_header(0).unwrap();
1214        assert_eq!(resp.version, bitcoin::block::Version::ONE);
1215        assert_eq!(resp.time, 1231006505);
1216        assert_eq!(resp.nonce, 0x7c2bac1d);
1217    }
1218
1219    #[test]
1220    fn test_block_header_raw() {
1221        let client = RawClient::new(get_test_server(), None).unwrap();
1222
1223        let resp = client.block_header_raw(0).unwrap();
1224        assert_eq!(
1225            resp,
1226            vec![
1227                1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1228                0, 0, 0, 0, 0, 0, 0, 0, 59, 163, 237, 253, 122, 123, 18, 178, 122, 199, 44, 62,
1229                103, 118, 143, 97, 127, 200, 27, 195, 136, 138, 81, 50, 58, 159, 184, 170, 75, 30,
1230                94, 74, 41, 171, 95, 73, 255, 255, 0, 29, 29, 172, 43, 124
1231            ]
1232        );
1233    }
1234
1235    #[test]
1236    fn test_block_headers() {
1237        let client = RawClient::new(get_test_server(), None).unwrap();
1238
1239        let resp = client.block_headers(0, 4).unwrap();
1240        assert_eq!(resp.count, 4);
1241        assert_eq!(resp.max, 2016);
1242        assert_eq!(resp.headers.len(), 4);
1243
1244        assert_eq!(resp.headers[0].time, 1231006505);
1245    }
1246
1247    #[test]
1248    fn test_script_get_balance() {
1249        use std::str::FromStr;
1250
1251        let client = RawClient::new(get_test_server(), None).unwrap();
1252
1253        // Realistically nobody will ever spend from this address, so we can expect the balance to
1254        // increase over time
1255        let addr = bitcoin::Address::from_str("1CounterpartyXXXXXXXXXXXXXXXUWLpVr")
1256            .unwrap()
1257            .assume_checked();
1258        let resp = client.script_get_balance(&addr.script_pubkey()).unwrap();
1259        assert!(resp.confirmed >= 213091301265);
1260    }
1261
1262    #[test]
1263    fn test_script_get_history() {
1264        use std::str::FromStr;
1265
1266        use bitcoin::Txid;
1267
1268        let client = RawClient::new(get_test_server(), None).unwrap();
1269
1270        // Mt.Gox hack address
1271        let addr = bitcoin::Address::from_str("1FeexV6bAHb8ybZjqQMjJrcCrHGW9sb6uF")
1272            .unwrap()
1273            .assume_checked();
1274        let resp = client.script_get_history(&addr.script_pubkey()).unwrap();
1275
1276        assert!(resp.len() >= 328);
1277        assert_eq!(
1278            resp[0].tx_hash,
1279            Txid::from_str("e67a0550848b7932d7796aeea16ab0e48a5cfe81c4e8cca2c5b03e0416850114")
1280                .unwrap()
1281        );
1282    }
1283
1284    #[test]
1285    fn test_script_list_unspent() {
1286        use bitcoin::Txid;
1287        use std::str::FromStr;
1288
1289        let client = RawClient::new(get_test_server(), None).unwrap();
1290
1291        // Peter todd's sha256 bounty address https://bitcointalk.org/index.php?topic=293382.0
1292        let addr = bitcoin::Address::from_str("35Snmmy3uhaer2gTboc81ayCip4m9DT4ko")
1293            .unwrap()
1294            .assume_checked();
1295        let resp = client.script_list_unspent(&addr.script_pubkey()).unwrap();
1296
1297        assert!(resp.len() >= 9);
1298        let txid = "397f12ee15f8a3d2ab25c0f6bb7d3c64d2038ca056af10dd8251b98ae0f076b0";
1299        let txid = Txid::from_str(txid).unwrap();
1300        let txs: Vec<_> = resp.iter().filter(|e| e.tx_hash == txid).collect();
1301        assert_eq!(txs.len(), 1);
1302        assert_eq!(txs[0].value, 10000000);
1303        assert_eq!(txs[0].height, 257674);
1304        assert_eq!(txs[0].tx_pos, 1);
1305    }
1306
1307    #[test]
1308    fn test_batch_script_list_unspent() {
1309        use std::str::FromStr;
1310
1311        let client = RawClient::new(get_test_server(), None).unwrap();
1312
1313        // Peter todd's sha256 bounty address https://bitcointalk.org/index.php?topic=293382.0
1314        let script_1 = bitcoin::Address::from_str("35Snmmy3uhaer2gTboc81ayCip4m9DT4ko")
1315            .unwrap()
1316            .assume_checked()
1317            .script_pubkey();
1318
1319        let resp = client
1320            .batch_script_list_unspent(vec![script_1.as_script()])
1321            .unwrap();
1322        assert_eq!(resp.len(), 1);
1323        assert!(resp[0].len() >= 9);
1324    }
1325
1326    #[test]
1327    fn test_batch_estimate_fee() {
1328        let client = RawClient::new(get_test_server(), None).unwrap();
1329
1330        let resp = client.batch_estimate_fee(vec![10, 20]).unwrap();
1331        assert_eq!(resp.len(), 2);
1332        assert!(resp[0] > 0.0);
1333        assert!(resp[1] > 0.0);
1334    }
1335
1336    #[test]
1337    fn test_transaction_get() {
1338        use bitcoin::{transaction, Txid};
1339
1340        let client = RawClient::new(get_test_server(), None).unwrap();
1341
1342        let resp = client
1343            .transaction_get(
1344                &Txid::from_str("cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566")
1345                    .unwrap(),
1346            )
1347            .unwrap();
1348        assert_eq!(resp.version, transaction::Version::ONE);
1349        assert_eq!(resp.lock_time.to_consensus_u32(), 0);
1350    }
1351
1352    #[test]
1353    fn test_transaction_get_raw() {
1354        use bitcoin::Txid;
1355
1356        let client = RawClient::new(get_test_server(), None).unwrap();
1357
1358        let resp = client
1359            .transaction_get_raw(
1360                &Txid::from_str("cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566")
1361                    .unwrap(),
1362            )
1363            .unwrap();
1364        assert_eq!(
1365            resp,
1366            vec![
1367                1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1368                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 84, 3, 240, 156, 9, 27, 77,
1369                105, 110, 101, 100, 32, 98, 121, 32, 65, 110, 116, 80, 111, 111, 108, 49, 49, 57,
1370                174, 0, 111, 32, 7, 77, 101, 40, 250, 190, 109, 109, 42, 177, 148, 141, 80, 179,
1371                217, 145, 226, 160, 130, 29, 247, 67, 88, 237, 156, 37, 83, 175, 0, 199, 166, 31,
1372                151, 119, 28, 160, 172, 238, 16, 110, 4, 0, 0, 0, 0, 0, 0, 0, 203, 236, 0, 128, 36,
1373                97, 249, 5, 255, 255, 255, 255, 3, 84, 206, 172, 42, 0, 0, 0, 0, 25, 118, 169, 20,
1374                17, 219, 228, 140, 198, 182, 23, 249, 198, 173, 175, 77, 158, 213, 246, 37, 177,
1375                199, 203, 89, 136, 172, 0, 0, 0, 0, 0, 0, 0, 0, 38, 106, 36, 170, 33, 169, 237, 46,
1376                87, 139, 206, 44, 166, 198, 188, 147, 89, 55, 115, 69, 216, 233, 133, 221, 95, 144,
1377                199, 132, 33, 255, 166, 239, 165, 235, 96, 66, 142, 105, 140, 0, 0, 0, 0, 0, 0, 0,
1378                0, 38, 106, 36, 185, 225, 27, 109, 47, 98, 29, 126, 195, 244, 90, 94, 202, 137,
1379                211, 234, 106, 41, 76, 223, 58, 4, 46, 151, 48, 9, 88, 68, 112, 161, 41, 22, 17,
1380                30, 44, 170, 1, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1381                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
1382            ]
1383        )
1384    }
1385
1386    #[test]
1387    fn test_transaction_get_merkle() {
1388        use bitcoin::Txid;
1389
1390        let client = RawClient::new(get_test_server(), None).unwrap();
1391
1392        let txid =
1393            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1394                .unwrap();
1395        let resp = client.transaction_get_merkle(&txid, 630000).unwrap();
1396        assert_eq!(resp.block_height, 630000);
1397        assert_eq!(resp.pos, 68);
1398        assert_eq!(resp.merkle.len(), 12);
1399        assert_eq!(
1400            resp.merkle[0],
1401            [
1402                34, 65, 51, 64, 49, 139, 115, 189, 185, 246, 70, 225, 168, 193, 217, 195, 47, 66,
1403                179, 240, 153, 24, 114, 215, 144, 196, 212, 41, 39, 155, 246, 25
1404            ]
1405        );
1406
1407        // Check we can verify the merkle proof validity, but fail if we supply wrong data.
1408        let block_header = client.block_header(resp.block_height).unwrap();
1409        assert!(utils::validate_merkle_proof(
1410            &txid,
1411            &block_header.merkle_root,
1412            &resp
1413        ));
1414
1415        let mut fail_resp = resp.clone();
1416        fail_resp.pos = 13;
1417        assert!(!utils::validate_merkle_proof(
1418            &txid,
1419            &block_header.merkle_root,
1420            &fail_resp
1421        ));
1422
1423        let fail_block_header = client.block_header(resp.block_height + 1).unwrap();
1424        assert!(!utils::validate_merkle_proof(
1425            &txid,
1426            &fail_block_header.merkle_root,
1427            &resp
1428        ));
1429    }
1430
1431    #[test]
1432    fn test_txid_from_pos() {
1433        use bitcoin::Txid;
1434
1435        let client = RawClient::new(get_test_server(), None).unwrap();
1436
1437        let txid =
1438            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1439                .unwrap();
1440        let resp = client.txid_from_pos(630000, 68).unwrap();
1441        assert_eq!(resp, txid);
1442    }
1443
1444    #[test]
1445    fn test_txid_from_pos_with_merkle() {
1446        use bitcoin::Txid;
1447
1448        let client = RawClient::new(get_test_server(), None).unwrap();
1449
1450        let txid =
1451            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1452                .unwrap();
1453        let resp = client.txid_from_pos_with_merkle(630000, 68).unwrap();
1454        assert_eq!(resp.tx_hash, txid);
1455        assert_eq!(
1456            resp.merkle[0],
1457            [
1458                34, 65, 51, 64, 49, 139, 115, 189, 185, 246, 70, 225, 168, 193, 217, 195, 47, 66,
1459                179, 240, 153, 24, 114, 215, 144, 196, 212, 41, 39, 155, 246, 25
1460            ]
1461        );
1462    }
1463
1464    #[test]
1465    fn test_ping() {
1466        let client = RawClient::new(get_test_server(), None).unwrap();
1467        client.ping().unwrap();
1468    }
1469
1470    #[test]
1471    fn test_block_headers_subscribe() {
1472        let client = RawClient::new(get_test_server(), None).unwrap();
1473        let resp = client.block_headers_subscribe().unwrap();
1474
1475        assert!(resp.height >= 639000);
1476    }
1477
1478    #[test]
1479    fn test_script_subscribe() {
1480        use std::str::FromStr;
1481
1482        let client = RawClient::new(get_test_server(), None).unwrap();
1483
1484        // Mt.Gox hack address
1485        let addr = bitcoin::Address::from_str("1FeexV6bAHb8ybZjqQMjJrcCrHGW9sb6uF")
1486            .unwrap()
1487            .assume_checked();
1488
1489        // Just make sure that the call returns Ok(something)
1490        client.script_subscribe(&addr.script_pubkey()).unwrap();
1491    }
1492
1493    #[test]
1494    fn test_request_after_error() {
1495        let client = RawClient::new(get_test_server(), None).unwrap();
1496
1497        assert!(client.transaction_broadcast_raw(&[0x00]).is_err());
1498        assert!(client.server_features().is_ok());
1499    }
1500
1501    #[test]
1502    fn test_raw_call() {
1503        use crate::types::Param;
1504
1505        let client = RawClient::new(get_test_server(), None).unwrap();
1506
1507        let params = vec![
1508            Param::String(
1509                "cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566".to_string(),
1510            ),
1511            Param::Bool(false),
1512        ];
1513
1514        let resp = client
1515            .raw_call("blockchain.transaction.get", params)
1516            .unwrap();
1517
1518        assert_eq!(
1519            resp,
1520            "01000000000101000000000000000000000000000000000000000000000000000\
1521            0000000000000ffffffff5403f09c091b4d696e656420627920416e74506f6f6c3\
1522            13139ae006f20074d6528fabe6d6d2ab1948d50b3d991e2a0821df74358ed9c255\
1523            3af00c7a61f97771ca0acee106e0400000000000000cbec00802461f905fffffff\
1524            f0354ceac2a000000001976a91411dbe48cc6b617f9c6adaf4d9ed5f625b1c7cb5\
1525            988ac0000000000000000266a24aa21a9ed2e578bce2ca6c6bc9359377345d8e98\
1526            5dd5f90c78421ffa6efa5eb60428e698c0000000000000000266a24b9e11b6d2f6\
1527            21d7ec3f45a5eca89d3ea6a294cdf3a042e973009584470a12916111e2caa01200\
1528            000000000000000000000000000000000000000000000000000000000000000000\
1529            00000"
1530        )
1531    }
1532}