electrum_client/
raw_client.rs

1//! Raw client
2//!
3//! This module contains the definition of the raw client that wraps the transport method
4
5use std::borrow::Borrow;
6use std::collections::{BTreeMap, HashMap, VecDeque};
7use std::io::{BufRead, BufReader, Read, Write};
8use std::mem::drop;
9use std::net::{TcpStream, ToSocketAddrs};
10use std::sync::atomic::{AtomicUsize, Ordering};
11use std::sync::mpsc::{channel, Receiver, Sender};
12use std::sync::{Arc, Mutex, TryLockError};
13use std::time::Duration;
14
15#[allow(unused_imports)]
16use log::{debug, error, info, trace, warn};
17
18use bitcoin::consensus::encode::deserialize;
19use bitcoin::hex::{DisplayHex, FromHex};
20use bitcoin::{Script, Txid};
21
22#[cfg(feature = "use-openssl")]
23use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode};
24
25#[cfg(all(
26    any(
27        feature = "default",
28        feature = "use-rustls",
29        feature = "use-rustls-ring"
30    ),
31    not(feature = "use-openssl")
32))]
33use rustls::{
34    pki_types::ServerName,
35    pki_types::{Der, TrustAnchor},
36    ClientConfig, ClientConnection, RootCertStore, StreamOwned,
37};
38
39#[cfg(any(feature = "default", feature = "proxy"))]
40use crate::socks::{Socks5Stream, TargetAddr, ToTargetAddr};
41
42use crate::stream::ClonableStream;
43
44use crate::api::ElectrumApi;
45use crate::batch::Batch;
46use crate::types::*;
47
48macro_rules! impl_batch_call {
49    ( $self:expr, $data:expr, $call:ident ) => {{
50        impl_batch_call!($self, $data, $call, )
51    }};
52
53    ( $self:expr, $data:expr, $call:ident, apply_deref ) => {{
54        impl_batch_call!($self, $data, $call, *)
55    }};
56
57    ( $self:expr, $data:expr, $call:ident, $($apply_deref:tt)? ) => {{
58        let mut batch = Batch::default();
59        for i in $data {
60            batch.$call($($apply_deref)* i.borrow());
61        }
62
63        let resp = $self.batch_call(&batch)?;
64        let mut answer = Vec::new();
65
66        for x in resp {
67            answer.push(serde_json::from_value(x)?);
68        }
69
70        Ok(answer)
71    }};
72}
73
74/// A trait for [`ToSocketAddrs`](https://doc.rust-lang.org/std/net/trait.ToSocketAddrs.html) that
75/// can also be turned into a domain. Used when an SSL client needs to validate the server's
76/// certificate.
77pub trait ToSocketAddrsDomain: ToSocketAddrs {
78    /// Returns the domain, if present
79    fn domain(&self) -> Option<&str> {
80        None
81    }
82}
83
84impl ToSocketAddrsDomain for &str {
85    fn domain(&self) -> Option<&str> {
86        self.split(':').next()
87    }
88}
89
90impl ToSocketAddrsDomain for (&str, u16) {
91    fn domain(&self) -> Option<&str> {
92        self.0.domain()
93    }
94}
95
96#[cfg(any(feature = "default", feature = "proxy"))]
97impl ToSocketAddrsDomain for TargetAddr {
98    fn domain(&self) -> Option<&str> {
99        match self {
100            TargetAddr::Ip(_) => None,
101            TargetAddr::Domain(domain, _) => Some(domain.as_str()),
102        }
103    }
104}
105
106macro_rules! impl_to_socket_addrs_domain {
107    ( $ty:ty ) => {
108        impl ToSocketAddrsDomain for $ty {}
109    };
110}
111
112impl_to_socket_addrs_domain!(std::net::SocketAddr);
113impl_to_socket_addrs_domain!(std::net::SocketAddrV4);
114impl_to_socket_addrs_domain!(std::net::SocketAddrV6);
115impl_to_socket_addrs_domain!((std::net::IpAddr, u16));
116impl_to_socket_addrs_domain!((std::net::Ipv4Addr, u16));
117impl_to_socket_addrs_domain!((std::net::Ipv6Addr, u16));
118
119/// Instance of an Electrum client
120///
121/// A `Client` maintains a constant connection with an Electrum server and exposes methods to
122/// interact with it. It can also subscribe and receive notifictations from the server about new
123/// blocks or activity on a specific *scriptPubKey*.
124///
125/// The `Client` is modeled in such a way that allows the external caller to have full control over
126/// its functionality: no threads or tasks are spawned internally to monitor the state of the
127/// connection.
128///
129/// More transport methods can be used by manually creating an instance of this struct with an
130/// arbitray `S` type.
131#[derive(Debug)]
132pub struct RawClient<S>
133where
134    S: Read + Write,
135{
136    stream: Mutex<ClonableStream<S>>,
137    buf_reader: Mutex<BufReader<ClonableStream<S>>>,
138
139    last_id: AtomicUsize,
140    waiting_map: Mutex<HashMap<usize, Sender<ChannelMessage>>>,
141
142    headers: Mutex<VecDeque<RawHeaderNotification>>,
143    script_notifications: Mutex<HashMap<ScriptHash, VecDeque<ScriptStatus>>>,
144
145    #[cfg(feature = "debug-calls")]
146    calls: AtomicUsize,
147}
148
149impl<S> From<S> for RawClient<S>
150where
151    S: Read + Write,
152{
153    fn from(stream: S) -> Self {
154        let stream: ClonableStream<_> = stream.into();
155
156        Self {
157            buf_reader: Mutex::new(BufReader::new(stream.clone())),
158            stream: Mutex::new(stream),
159
160            last_id: AtomicUsize::new(0),
161            waiting_map: Mutex::new(HashMap::new()),
162
163            headers: Mutex::new(VecDeque::new()),
164            script_notifications: Mutex::new(HashMap::new()),
165
166            #[cfg(feature = "debug-calls")]
167            calls: AtomicUsize::new(0),
168        }
169    }
170}
171
172/// Transport type used to establish a plaintext TCP connection with the server
173pub type ElectrumPlaintextStream = TcpStream;
174impl RawClient<ElectrumPlaintextStream> {
175    /// Creates a new plaintext client and tries to connect to `socket_addr`.
176    pub fn new<A: ToSocketAddrs>(
177        socket_addrs: A,
178        timeout: Option<Duration>,
179    ) -> Result<Self, Error> {
180        let stream = match timeout {
181            Some(timeout) => {
182                let stream = connect_with_total_timeout(socket_addrs, timeout)?;
183                stream.set_read_timeout(Some(timeout))?;
184                stream.set_write_timeout(Some(timeout))?;
185                stream
186            }
187            None => TcpStream::connect(socket_addrs)?,
188        };
189
190        Ok(stream.into())
191    }
192}
193
194fn connect_with_total_timeout<A: ToSocketAddrs>(
195    socket_addrs: A,
196    mut timeout: Duration,
197) -> Result<TcpStream, Error> {
198    // Use the same algorithm as curl: 1/2 on the first host, 1/4 on the second one, etc.
199    // https://curl.se/mail/lib-2014-11/0164.html
200
201    let mut errors = Vec::new();
202
203    let addrs = socket_addrs
204        .to_socket_addrs()?
205        .enumerate()
206        .collect::<Vec<_>>();
207    for (index, addr) in &addrs {
208        if *index < addrs.len() - 1 {
209            timeout = timeout.div_f32(2.0);
210        }
211
212        info!(
213            "Trying to connect to {} (attempt {}/{}) with timeout {:?}",
214            addr,
215            index + 1,
216            addrs.len(),
217            timeout
218        );
219        match TcpStream::connect_timeout(addr, timeout) {
220            Ok(socket) => return Ok(socket),
221            Err(e) => {
222                warn!("Connection error: {:?}", e);
223                errors.push(e.into());
224            }
225        }
226    }
227
228    Err(Error::AllAttemptsErrored(errors))
229}
230
231#[cfg(feature = "use-openssl")]
232/// Transport type used to establish an OpenSSL TLS encrypted/authenticated connection with the server
233pub type ElectrumSslStream = SslStream<TcpStream>;
234#[cfg(feature = "use-openssl")]
235impl RawClient<ElectrumSslStream> {
236    /// Creates a new SSL client and tries to connect to `socket_addr`. Optionally, if
237    /// `validate_domain` is `true`, validate the server's certificate.
238    pub fn new_ssl<A: ToSocketAddrsDomain + Clone>(
239        socket_addrs: A,
240        validate_domain: bool,
241        timeout: Option<Duration>,
242    ) -> Result<Self, Error> {
243        debug!(
244            "new_ssl socket_addrs.domain():{:?} validate_domain:{} timeout:{:?}",
245            socket_addrs.domain(),
246            validate_domain,
247            timeout
248        );
249        if validate_domain {
250            socket_addrs.domain().ok_or(Error::MissingDomain)?;
251        }
252        match timeout {
253            Some(timeout) => {
254                let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
255                stream.set_read_timeout(Some(timeout))?;
256                stream.set_write_timeout(Some(timeout))?;
257                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
258            }
259            None => {
260                let stream = TcpStream::connect(socket_addrs.clone())?;
261                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
262            }
263        }
264    }
265
266    /// Create a new SSL client using an existing TcpStream
267    pub fn new_ssl_from_stream<A: ToSocketAddrsDomain>(
268        socket_addrs: A,
269        validate_domain: bool,
270        stream: TcpStream,
271    ) -> Result<Self, Error> {
272        let mut builder =
273            SslConnector::builder(SslMethod::tls()).map_err(Error::InvalidSslMethod)?;
274        // TODO: support for certificate pinning
275        if validate_domain {
276            socket_addrs.domain().ok_or(Error::MissingDomain)?;
277        } else {
278            builder.set_verify(SslVerifyMode::NONE);
279        }
280        let connector = builder.build();
281
282        let domain = socket_addrs.domain().unwrap_or("NONE").to_string();
283
284        let stream = connector
285            .connect(&domain, stream)
286            .map_err(Error::SslHandshakeError)?;
287
288        Ok(stream.into())
289    }
290}
291
292#[cfg(all(
293    any(
294        feature = "default",
295        feature = "use-rustls",
296        feature = "use-rustls-ring"
297    ),
298    not(feature = "use-openssl")
299))]
300mod danger {
301    use crate::raw_client::ServerName;
302    use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified};
303    use rustls::crypto::CryptoProvider;
304    use rustls::pki_types::{CertificateDer, UnixTime};
305    use rustls::DigitallySignedStruct;
306
307    #[derive(Debug)]
308    pub struct NoCertificateVerification(CryptoProvider);
309
310    impl NoCertificateVerification {
311        pub fn new(provider: CryptoProvider) -> Self {
312            Self(provider)
313        }
314    }
315
316    impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
317        fn verify_server_cert(
318            &self,
319            _end_entity: &CertificateDer<'_>,
320            _intermediates: &[CertificateDer<'_>],
321            _server_name: &ServerName<'_>,
322            _ocsp: &[u8],
323            _now: UnixTime,
324        ) -> Result<ServerCertVerified, rustls::Error> {
325            Ok(ServerCertVerified::assertion())
326        }
327
328        fn verify_tls12_signature(
329            &self,
330            _message: &[u8],
331            _cert: &CertificateDer<'_>,
332            _dss: &DigitallySignedStruct,
333        ) -> Result<HandshakeSignatureValid, rustls::Error> {
334            Ok(HandshakeSignatureValid::assertion())
335        }
336
337        fn verify_tls13_signature(
338            &self,
339            _message: &[u8],
340            _cert: &CertificateDer<'_>,
341            _dss: &DigitallySignedStruct,
342        ) -> Result<HandshakeSignatureValid, rustls::Error> {
343            Ok(HandshakeSignatureValid::assertion())
344        }
345
346        fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
347            self.0.signature_verification_algorithms.supported_schemes()
348        }
349    }
350}
351
352#[cfg(all(
353    any(
354        feature = "default",
355        feature = "use-rustls",
356        feature = "use-rustls-ring"
357    ),
358    not(feature = "use-openssl")
359))]
360/// Transport type used to establish a Rustls TLS encrypted/authenticated connection with the server
361pub type ElectrumSslStream = StreamOwned<ClientConnection, TcpStream>;
362#[cfg(all(
363    any(
364        feature = "default",
365        feature = "use-rustls",
366        feature = "use-rustls-ring"
367    ),
368    not(feature = "use-openssl")
369))]
370impl RawClient<ElectrumSslStream> {
371    /// Creates a new SSL client and tries to connect to `socket_addr`. Optionally, if
372    /// `validate_domain` is `true`, validate the server's certificate.
373    pub fn new_ssl<A: ToSocketAddrsDomain + Clone>(
374        socket_addrs: A,
375        validate_domain: bool,
376        timeout: Option<Duration>,
377    ) -> Result<Self, Error> {
378        debug!(
379            "new_ssl socket_addrs.domain():{:?} validate_domain:{} timeout:{:?}",
380            socket_addrs.domain(),
381            validate_domain,
382            timeout
383        );
384        if validate_domain {
385            socket_addrs.domain().ok_or(Error::MissingDomain)?;
386        }
387        match timeout {
388            Some(timeout) => {
389                let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
390                stream.set_read_timeout(Some(timeout))?;
391                stream.set_write_timeout(Some(timeout))?;
392                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
393            }
394            None => {
395                let stream = TcpStream::connect(socket_addrs.clone())?;
396                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
397            }
398        }
399    }
400
401    /// Create a new SSL client using an existing TcpStream
402    pub fn new_ssl_from_stream<A: ToSocketAddrsDomain>(
403        socket_addr: A,
404        validate_domain: bool,
405        tcp_stream: TcpStream,
406    ) -> Result<Self, Error> {
407        use std::convert::TryFrom;
408
409        if rustls::crypto::CryptoProvider::get_default().is_none() {
410            // We install a crypto provider depending on the set feature.
411            #[cfg(feature = "use-rustls")]
412            rustls::crypto::CryptoProvider::install_default(
413                rustls::crypto::aws_lc_rs::default_provider(),
414            )
415            .map_err(|_| {
416                Error::CouldNotCreateConnection(rustls::Error::General(
417                    "Failed to install CryptoProvider".to_string(),
418                ))
419            })?;
420
421            #[cfg(feature = "use-rustls-ring")]
422            rustls::crypto::CryptoProvider::install_default(
423                rustls::crypto::ring::default_provider(),
424            )
425            .map_err(|_| {
426                Error::CouldNotCreateConnection(rustls::Error::General(
427                    "Failed to install CryptoProvider".to_string(),
428                ))
429            })?;
430        }
431
432        let builder = ClientConfig::builder();
433
434        let config = if validate_domain {
435            socket_addr.domain().ok_or(Error::MissingDomain)?;
436
437            let store = webpki_roots::TLS_SERVER_ROOTS
438                .iter()
439                .map(|t| TrustAnchor {
440                    subject: Der::from_slice(t.subject),
441                    subject_public_key_info: Der::from_slice(t.spki),
442                    name_constraints: t.name_constraints.map(Der::from_slice),
443                })
444                .collect::<RootCertStore>();
445
446            // TODO: cert pinning
447            builder.with_root_certificates(store).with_no_client_auth()
448        } else {
449            builder
450                .dangerous()
451                .with_custom_certificate_verifier(std::sync::Arc::new(
452                    #[cfg(feature = "use-rustls")]
453                    danger::NoCertificateVerification::new(rustls::crypto::aws_lc_rs::default_provider()),
454                    #[cfg(feature = "use-rustls-ring")]
455                    danger::NoCertificateVerification::new(rustls::crypto::ring::default_provider()),
456                ))
457                .with_no_client_auth()
458        };
459
460        let domain = socket_addr.domain().unwrap_or("NONE").to_string();
461        let session = ClientConnection::new(
462            std::sync::Arc::new(config),
463            ServerName::try_from(domain.clone())
464                .map_err(|_| Error::InvalidDNSNameError(domain.clone()))?,
465        )
466        .map_err(Error::CouldNotCreateConnection)?;
467        let stream = StreamOwned::new(session, tcp_stream);
468
469        Ok(stream.into())
470    }
471}
472
473#[cfg(any(feature = "default", feature = "proxy"))]
474/// Transport type used to establish a connection to a server through a socks proxy
475pub type ElectrumProxyStream = Socks5Stream;
476#[cfg(any(feature = "default", feature = "proxy"))]
477impl RawClient<ElectrumProxyStream> {
478    /// Creates a new socks client and tries to connect to `target_addr` using `proxy_addr` as a
479    /// socks proxy server. The DNS resolution of `target_addr`, if required, is done
480    /// through the proxy. This allows to specify, for instance, `.onion` addresses.
481    pub fn new_proxy<T: ToTargetAddr>(
482        target_addr: T,
483        proxy: &crate::Socks5Config,
484        timeout: Option<Duration>,
485    ) -> Result<Self, Error> {
486        let mut stream = match proxy.credentials.as_ref() {
487            Some(cred) => Socks5Stream::connect_with_password(
488                &proxy.addr,
489                target_addr,
490                &cred.username,
491                &cred.password,
492                timeout,
493            )?,
494            None => Socks5Stream::connect(&proxy.addr, target_addr, timeout)?,
495        };
496        stream.get_mut().set_read_timeout(timeout)?;
497        stream.get_mut().set_write_timeout(timeout)?;
498
499        Ok(stream.into())
500    }
501
502    #[cfg(any(
503        feature = "use-openssl",
504        feature = "use-rustls",
505        feature = "use-rustls-ring"
506    ))]
507    /// Creates a new TLS client that connects to `target_addr` using `proxy_addr` as a socks proxy
508    /// server. The DNS resolution of `target_addr`, if required, is done through the proxy. This
509    /// allows to specify, for instance, `.onion` addresses.
510    pub fn new_proxy_ssl<T: ToTargetAddr>(
511        target_addr: T,
512        validate_domain: bool,
513        proxy: &crate::Socks5Config,
514        timeout: Option<Duration>,
515    ) -> Result<RawClient<ElectrumSslStream>, Error> {
516        let target = target_addr.to_target_addr()?;
517
518        let mut stream = match proxy.credentials.as_ref() {
519            Some(cred) => Socks5Stream::connect_with_password(
520                &proxy.addr,
521                target_addr,
522                &cred.username,
523                &cred.password,
524                timeout,
525            )?,
526            None => Socks5Stream::connect(&proxy.addr, target.clone(), timeout)?,
527        };
528        stream.get_mut().set_read_timeout(timeout)?;
529        stream.get_mut().set_write_timeout(timeout)?;
530
531        RawClient::new_ssl_from_stream(target, validate_domain, stream.into_inner())
532    }
533}
534
535#[derive(Debug)]
536enum ChannelMessage {
537    Response(serde_json::Value),
538    WakeUp,
539    Error(Arc<std::io::Error>),
540}
541
542impl<S: Read + Write> RawClient<S> {
543    // TODO: to enable this we have to find a way to allow concurrent read and writes to the
544    // underlying transport struct. This can be done pretty easily for TcpStream because it can be
545    // split into a "read" and a "write" object, but it's not as trivial for other types. Without
546    // such thing, this causes a deadlock, because the reader thread takes a lock on the
547    // `ClonableStream` before other threads can send a request to the server. They will block
548    // waiting for the reader to release the mutex, but this will never happen because the server
549    // didn't receive any request, so it has nothing to send back.
550    // pub fn reader_thread(&self) -> Result<(), Error> {
551    //     self._reader_thread(None).map(|_| ())
552    // }
553
554    fn _reader_thread(&self, until_message: Option<usize>) -> Result<serde_json::Value, Error> {
555        let mut raw_resp = String::new();
556        let resp = match self.buf_reader.try_lock() {
557            Ok(mut reader) => {
558                trace!(
559                    "Starting reader thread with `until_message` = {:?}",
560                    until_message
561                );
562
563                if let Some(until_message) = until_message {
564                    // If we are trying to start a reader thread but the corresponding sender is
565                    // missing from the map, exit immediately. We might have already received a
566                    // response for that id, but we don't know it yet. Exiting here forces the
567                    // calling code to fallback to the sender-receiver method, and it should find
568                    // a message there waiting for it.
569                    if self.waiting_map.lock()?.get(&until_message).is_none() {
570                        return Err(Error::CouldntLockReader);
571                    }
572                }
573
574                // Loop over every message
575                loop {
576                    raw_resp.clear();
577
578                    if let Err(e) = reader.read_line(&mut raw_resp) {
579                        let error = Arc::new(e);
580                        for (_, s) in self.waiting_map.lock().unwrap().drain() {
581                            s.send(ChannelMessage::Error(error.clone()))?;
582                        }
583                        return Err(Error::SharedIOError(error));
584                    }
585                    trace!("<== {}", raw_resp);
586
587                    let resp: serde_json::Value = serde_json::from_str(&raw_resp)?;
588
589                    // Normally there is and id, but it's missing for spontaneous notifications
590                    // from the server
591                    let resp_id = resp["id"]
592                        .as_str()
593                        .and_then(|s| s.parse().ok())
594                        .or_else(|| resp["id"].as_u64().map(|i| i as usize));
595                    match resp_id {
596                        Some(resp_id) if until_message == Some(resp_id) => {
597                            // We have a valid id and it's exactly the one we were waiting for!
598                            trace!(
599                                "Reader thread {} received a response for its request",
600                                resp_id
601                            );
602
603                            // Remove ourselves from the "waiting map"
604                            let mut map = self.waiting_map.lock()?;
605                            map.remove(&resp_id);
606
607                            // If the map is not empty, we select a random thread to become the
608                            // new reader thread.
609                            if let Some(err) = map.values().find_map(|sender| {
610                                sender
611                                    .send(ChannelMessage::WakeUp)
612                                    .map_err(|err| {
613                                        warn!("Unable to wake up a thread, trying some other");
614                                        err
615                                    })
616                                    .err()
617                            }) {
618                                error!("All the threads has failed, giving up");
619                                return Err(err)?;
620                            }
621
622                            break Ok(resp);
623                        }
624                        Some(resp_id) => {
625                            // We have an id, but it's not our response. Notify the thread and
626                            // move on
627                            trace!("Reader thread received response for {}", resp_id);
628
629                            if let Some(sender) = self.waiting_map.lock()?.remove(&resp_id) {
630                                sender.send(ChannelMessage::Response(resp))?;
631                            } else {
632                                warn!("Missing listener for {}", resp_id);
633                            }
634                        }
635                        None => {
636                            // No id, that's probably a notification.
637                            let mut resp = resp;
638
639                            if let Some(method) = resp["method"].take().as_str() {
640                                self.handle_notification(method, resp["params"].take())?;
641                            } else {
642                                warn!("Unexpected response: {:?}", resp);
643                            }
644                        }
645                    }
646                }
647            }
648            Err(TryLockError::WouldBlock) => {
649                // If we "WouldBlock" here it means that there's already a reader thread
650                // running somewhere.
651                Err(Error::CouldntLockReader)
652            }
653            Err(TryLockError::Poisoned(e)) => Err(e)?,
654        };
655
656        let resp = resp?;
657        if let Some(err) = resp.get("error") {
658            Err(Error::Protocol(err.clone()))
659        } else {
660            Ok(resp)
661        }
662    }
663
664    fn call(&self, req: Request) -> Result<serde_json::Value, Error> {
665        // Add our listener to the map before we send the request, to make sure we don't get a
666        // reply before the receiver is added
667        let (sender, receiver) = channel();
668        self.waiting_map.lock()?.insert(req.id, sender);
669
670        let mut raw = serde_json::to_vec(&req)?;
671        trace!("==> {}", String::from_utf8_lossy(&raw));
672
673        raw.extend_from_slice(b"\n");
674        let mut stream = self.stream.lock()?;
675        stream.write_all(&raw)?;
676        stream.flush()?;
677        drop(stream); // release the lock
678
679        self.increment_calls();
680
681        let mut resp = match self.recv(&receiver, req.id) {
682            Ok(resp) => resp,
683            e @ Err(_) => {
684                // In case of error our sender could still be left in the map, depending on where
685                // the error happened. Just in case, try to remove it here
686                self.waiting_map.lock()?.remove(&req.id);
687                return e;
688            }
689        };
690        Ok(resp["result"].take())
691    }
692
693    fn recv(
694        &self,
695        receiver: &Receiver<ChannelMessage>,
696        req_id: usize,
697    ) -> Result<serde_json::Value, Error> {
698        loop {
699            // Try to take the lock on the reader. If we manage to do so, we'll become the reader
700            // thread until we get our reponse
701            match self._reader_thread(Some(req_id)) {
702                Ok(response) => break Ok(response),
703                Err(Error::CouldntLockReader) => {
704                    match receiver.recv()? {
705                        // Received our response, returning it
706                        ChannelMessage::Response(received) => break Ok(received),
707                        ChannelMessage::WakeUp => {
708                            // We have been woken up, this means that we should try becoming the
709                            // reader thread ourselves
710                            trace!("WakeUp for {}", req_id);
711
712                            continue;
713                        }
714                        ChannelMessage::Error(e) => {
715                            warn!("Received ChannelMessage::Error");
716
717                            break Err(Error::SharedIOError(e));
718                        }
719                    }
720                }
721                e @ Err(_) => break e,
722            }
723        }
724    }
725
726    fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> {
727        match method {
728            "blockchain.headers.subscribe" => self.headers.lock()?.append(
729                &mut serde_json::from_value::<Vec<RawHeaderNotification>>(result)?
730                    .into_iter()
731                    .collect(),
732            ),
733            "blockchain.scripthash.subscribe" => {
734                let unserialized: ScriptNotification = serde_json::from_value(result)?;
735                let mut script_notifications = self.script_notifications.lock()?;
736
737                let queue = script_notifications
738                    .get_mut(&unserialized.scripthash)
739                    .ok_or(Error::NotSubscribed(unserialized.scripthash))?;
740
741                queue.push_back(unserialized.status);
742            }
743            _ => info!("received unknown notification for method `{}`", method),
744        }
745
746        Ok(())
747    }
748
749    pub(crate) fn internal_raw_call_with_vec(
750        &self,
751        method_name: &str,
752        params: Vec<Param>,
753    ) -> Result<serde_json::Value, Error> {
754        let req = Request::new_id(
755            self.last_id.fetch_add(1, Ordering::SeqCst),
756            method_name,
757            params,
758        );
759        let result = self.call(req)?;
760
761        Ok(result)
762    }
763
764    #[inline]
765    #[cfg(feature = "debug-calls")]
766    fn increment_calls(&self) {
767        self.calls.fetch_add(1, Ordering::SeqCst);
768    }
769
770    #[inline]
771    #[cfg(not(feature = "debug-calls"))]
772    fn increment_calls(&self) {}
773}
774
775impl<T: Read + Write> ElectrumApi for RawClient<T> {
776    fn raw_call(
777        &self,
778        method_name: &str,
779        params: impl IntoIterator<Item = Param>,
780    ) -> Result<serde_json::Value, Error> {
781        self.internal_raw_call_with_vec(method_name, params.into_iter().collect())
782    }
783
784    fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
785        let mut raw = Vec::new();
786
787        let mut missing_responses = Vec::new();
788        let mut answers = BTreeMap::new();
789
790        // Add our listener to the map before we send the request
791
792        for (method, params) in batch.iter() {
793            let req = Request::new_id(
794                self.last_id.fetch_add(1, Ordering::SeqCst),
795                method,
796                params.to_vec(),
797            );
798            // Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map
799            // we can be sure that the response gets sent to the correct channel in self.recv
800            let (sender, receiver) = channel();
801            missing_responses.push((req.id, receiver));
802
803            self.waiting_map.lock()?.insert(req.id, sender);
804
805            raw.append(&mut serde_json::to_vec(&req)?);
806            raw.extend_from_slice(b"\n");
807        }
808
809        if missing_responses.is_empty() {
810            return Ok(vec![]);
811        }
812
813        trace!("==> {}", String::from_utf8_lossy(&raw));
814
815        let mut stream = self.stream.lock()?;
816        stream.write_all(&raw)?;
817        stream.flush()?;
818        drop(stream); // release the lock
819
820        self.increment_calls();
821
822        for (req_id, receiver) in missing_responses.iter() {
823            match self.recv(receiver, *req_id) {
824                Ok(mut resp) => answers.insert(req_id, resp["result"].take()),
825                Err(e) => {
826                    // In case of error our sender could still be left in the map, depending on where
827                    // the error happened. Just in case, try to remove it here
828                    warn!("got error for req_id {}: {:?}", req_id, e);
829                    warn!("removing all waiting req of this batch");
830                    let mut guard = self.waiting_map.lock()?;
831                    for (req_id, _) in missing_responses.iter() {
832                        guard.remove(req_id);
833                    }
834                    return Err(e);
835                }
836            };
837        }
838
839        Ok(answers.into_values().collect())
840    }
841
842    fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
843        let req = Request::new_id(
844            self.last_id.fetch_add(1, Ordering::SeqCst),
845            "blockchain.headers.subscribe",
846            vec![],
847        );
848        let value = self.call(req)?;
849
850        Ok(serde_json::from_value(value)?)
851    }
852
853    fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
854        Ok(self.headers.lock()?.pop_front())
855    }
856
857    fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
858        let req = Request::new_id(
859            self.last_id.fetch_add(1, Ordering::SeqCst),
860            "blockchain.block.header",
861            vec![Param::Usize(height)],
862        );
863        let result = self.call(req)?;
864
865        Ok(Vec::<u8>::from_hex(
866            result
867                .as_str()
868                .ok_or_else(|| Error::InvalidResponse(result.clone()))?,
869        )?)
870    }
871
872    fn block_headers(&self, start_height: usize, count: usize) -> Result<GetHeadersRes, Error> {
873        let req = Request::new_id(
874            self.last_id.fetch_add(1, Ordering::SeqCst),
875            "blockchain.block.headers",
876            vec![Param::Usize(start_height), Param::Usize(count)],
877        );
878        let result = self.call(req)?;
879
880        let mut deserialized: GetHeadersRes = serde_json::from_value(result)?;
881        for i in 0..deserialized.count {
882            let (start, end) = (i * 80, (i + 1) * 80);
883            deserialized
884                .headers
885                .push(deserialize(&deserialized.raw_headers[start..end])?);
886        }
887        deserialized.raw_headers.clear();
888
889        Ok(deserialized)
890    }
891
892    fn estimate_fee(&self, number: usize) -> Result<f64, Error> {
893        let req = Request::new_id(
894            self.last_id.fetch_add(1, Ordering::SeqCst),
895            "blockchain.estimatefee",
896            vec![Param::Usize(number)],
897        );
898        let result = self.call(req)?;
899
900        result
901            .as_f64()
902            .ok_or_else(|| Error::InvalidResponse(result.clone()))
903    }
904
905    fn relay_fee(&self) -> Result<f64, Error> {
906        let req = Request::new_id(
907            self.last_id.fetch_add(1, Ordering::SeqCst),
908            "blockchain.relayfee",
909            vec![],
910        );
911        let result = self.call(req)?;
912
913        result
914            .as_f64()
915            .ok_or_else(|| Error::InvalidResponse(result.clone()))
916    }
917
918    fn script_subscribe(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
919        let script_hash = script.to_electrum_scripthash();
920        let mut script_notifications = self.script_notifications.lock()?;
921
922        if script_notifications.contains_key(&script_hash) {
923            return Err(Error::AlreadySubscribed(script_hash));
924        }
925
926        script_notifications.insert(script_hash, VecDeque::new());
927        drop(script_notifications);
928
929        let req = Request::new_id(
930            self.last_id.fetch_add(1, Ordering::SeqCst),
931            "blockchain.scripthash.subscribe",
932            vec![Param::String(script_hash.to_hex())],
933        );
934        let value = self.call(req)?;
935
936        Ok(serde_json::from_value(value)?)
937    }
938
939    fn batch_script_subscribe<'s, I>(&self, scripts: I) -> Result<Vec<Option<ScriptStatus>>, Error>
940    where
941        I: IntoIterator + Clone,
942        I::Item: Borrow<&'s Script>,
943    {
944        {
945            let mut script_notifications = self.script_notifications.lock()?;
946
947            for script in scripts.clone() {
948                let script_hash = script.borrow().to_electrum_scripthash();
949                if script_notifications.contains_key(&script_hash) {
950                    return Err(Error::AlreadySubscribed(script_hash));
951                }
952                script_notifications.insert(script_hash, VecDeque::new());
953            }
954        }
955        impl_batch_call!(self, scripts, script_subscribe)
956    }
957
958    fn script_unsubscribe(&self, script: &Script) -> Result<bool, Error> {
959        let script_hash = script.to_electrum_scripthash();
960        let mut script_notifications = self.script_notifications.lock()?;
961
962        if !script_notifications.contains_key(&script_hash) {
963            return Err(Error::NotSubscribed(script_hash));
964        }
965
966        let req = Request::new_id(
967            self.last_id.fetch_add(1, Ordering::SeqCst),
968            "blockchain.scripthash.unsubscribe",
969            vec![Param::String(script_hash.to_hex())],
970        );
971        let value = self.call(req)?;
972        let answer = serde_json::from_value(value)?;
973
974        script_notifications.remove(&script_hash);
975
976        Ok(answer)
977    }
978
979    fn script_pop(&self, script: &Script) -> Result<Option<ScriptStatus>, Error> {
980        let script_hash = script.to_electrum_scripthash();
981
982        match self.script_notifications.lock()?.get_mut(&script_hash) {
983            None => Err(Error::NotSubscribed(script_hash)),
984            Some(queue) => Ok(queue.pop_front()),
985        }
986    }
987
988    fn script_get_balance(&self, script: &Script) -> Result<GetBalanceRes, Error> {
989        let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
990        let req = Request::new_id(
991            self.last_id.fetch_add(1, Ordering::SeqCst),
992            "blockchain.scripthash.get_balance",
993            params,
994        );
995        let result = self.call(req)?;
996
997        Ok(serde_json::from_value(result)?)
998    }
999    fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result<Vec<GetBalanceRes>, Error>
1000    where
1001        I: IntoIterator + Clone,
1002        I::Item: Borrow<&'s Script>,
1003    {
1004        impl_batch_call!(self, scripts, script_get_balance)
1005    }
1006
1007    fn script_get_history(&self, script: &Script) -> Result<Vec<GetHistoryRes>, Error> {
1008        let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
1009        let req = Request::new_id(
1010            self.last_id.fetch_add(1, Ordering::SeqCst),
1011            "blockchain.scripthash.get_history",
1012            params,
1013        );
1014        let result = self.call(req)?;
1015
1016        Ok(serde_json::from_value(result)?)
1017    }
1018    fn batch_script_get_history<'s, I>(&self, scripts: I) -> Result<Vec<Vec<GetHistoryRes>>, Error>
1019    where
1020        I: IntoIterator + Clone,
1021        I::Item: Borrow<&'s Script>,
1022    {
1023        impl_batch_call!(self, scripts, script_get_history)
1024    }
1025
1026    fn script_list_unspent(&self, script: &Script) -> Result<Vec<ListUnspentRes>, Error> {
1027        let params = vec![Param::String(script.to_electrum_scripthash().to_hex())];
1028        let req = Request::new_id(
1029            self.last_id.fetch_add(1, Ordering::SeqCst),
1030            "blockchain.scripthash.listunspent",
1031            params,
1032        );
1033        let result = self.call(req)?;
1034        let mut result: Vec<ListUnspentRes> = serde_json::from_value(result)?;
1035
1036        // This should not be necessary, since the protocol documentation says that the txs should
1037        // be "in blockchain order" (https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-listunspent).
1038        // However, elects seems to be ignoring this at the moment, so we'll sort again here just
1039        // to make sure the result is consistent.
1040        result.sort_unstable_by_key(|k| (k.height, k.tx_pos));
1041        Ok(result)
1042    }
1043
1044    fn batch_script_list_unspent<'s, I>(
1045        &self,
1046        scripts: I,
1047    ) -> Result<Vec<Vec<ListUnspentRes>>, Error>
1048    where
1049        I: IntoIterator + Clone,
1050        I::Item: Borrow<&'s Script>,
1051    {
1052        impl_batch_call!(self, scripts, script_list_unspent)
1053    }
1054
1055    fn transaction_get_raw(&self, txid: &Txid) -> Result<Vec<u8>, Error> {
1056        let params = vec![Param::String(format!("{:x}", txid))];
1057        let req = Request::new_id(
1058            self.last_id.fetch_add(1, Ordering::SeqCst),
1059            "blockchain.transaction.get",
1060            params,
1061        );
1062        let result = self.call(req)?;
1063
1064        Ok(Vec::<u8>::from_hex(
1065            result
1066                .as_str()
1067                .ok_or_else(|| Error::InvalidResponse(result.clone()))?,
1068        )?)
1069    }
1070
1071    fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result<Vec<Vec<u8>>, Error>
1072    where
1073        I: IntoIterator + Clone,
1074        I::Item: Borrow<&'t Txid>,
1075    {
1076        let txs_string: Result<Vec<String>, Error> = impl_batch_call!(self, txids, transaction_get);
1077        txs_string?
1078            .iter()
1079            .map(|s| Ok(Vec::<u8>::from_hex(s)?))
1080            .collect()
1081    }
1082
1083    fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result<Vec<Vec<u8>>, Error>
1084    where
1085        I: IntoIterator + Clone,
1086        I::Item: Borrow<u32>,
1087    {
1088        let headers_string: Result<Vec<String>, Error> =
1089            impl_batch_call!(self, heights, block_header, apply_deref);
1090        headers_string?
1091            .iter()
1092            .map(|s| Ok(Vec::<u8>::from_hex(s)?))
1093            .collect()
1094    }
1095
1096    fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result<Vec<f64>, Error>
1097    where
1098        I: IntoIterator + Clone,
1099        I::Item: Borrow<usize>,
1100    {
1101        impl_batch_call!(self, numbers, estimate_fee, apply_deref)
1102    }
1103
1104    fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result<Txid, Error> {
1105        let params = vec![Param::String(raw_tx.to_lower_hex_string())];
1106        let req = Request::new_id(
1107            self.last_id.fetch_add(1, Ordering::SeqCst),
1108            "blockchain.transaction.broadcast",
1109            params,
1110        );
1111        let result = self.call(req)?;
1112
1113        Ok(serde_json::from_value(result)?)
1114    }
1115
1116    fn transaction_get_merkle(&self, txid: &Txid, height: usize) -> Result<GetMerkleRes, Error> {
1117        let params = vec![Param::String(format!("{:x}", txid)), Param::Usize(height)];
1118        let req = Request::new_id(
1119            self.last_id.fetch_add(1, Ordering::SeqCst),
1120            "blockchain.transaction.get_merkle",
1121            params,
1122        );
1123        let result = self.call(req)?;
1124
1125        Ok(serde_json::from_value(result)?)
1126    }
1127
1128    fn batch_transaction_get_merkle<I>(
1129        &self,
1130        txids_and_heights: I,
1131    ) -> Result<Vec<GetMerkleRes>, Error>
1132    where
1133        I: IntoIterator + Clone,
1134        I::Item: Borrow<(Txid, usize)>,
1135    {
1136        impl_batch_call!(self, txids_and_heights, transaction_get_merkle)
1137    }
1138
1139    fn txid_from_pos(&self, height: usize, tx_pos: usize) -> Result<Txid, Error> {
1140        let params = vec![Param::Usize(height), Param::Usize(tx_pos)];
1141        let req = Request::new_id(
1142            self.last_id.fetch_add(1, Ordering::SeqCst),
1143            "blockchain.transaction.id_from_pos",
1144            params,
1145        );
1146        let result = self.call(req)?;
1147
1148        Ok(serde_json::from_value(result)?)
1149    }
1150
1151    fn txid_from_pos_with_merkle(
1152        &self,
1153        height: usize,
1154        tx_pos: usize,
1155    ) -> Result<TxidFromPosRes, Error> {
1156        let params = vec![
1157            Param::Usize(height),
1158            Param::Usize(tx_pos),
1159            Param::Bool(true),
1160        ];
1161        let req = Request::new_id(
1162            self.last_id.fetch_add(1, Ordering::SeqCst),
1163            "blockchain.transaction.id_from_pos",
1164            params,
1165        );
1166        let result = self.call(req)?;
1167
1168        Ok(serde_json::from_value(result)?)
1169    }
1170
1171    fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
1172        let req = Request::new_id(
1173            self.last_id.fetch_add(1, Ordering::SeqCst),
1174            "server.features",
1175            vec![],
1176        );
1177        let result = self.call(req)?;
1178
1179        Ok(serde_json::from_value(result)?)
1180    }
1181
1182    fn ping(&self) -> Result<(), Error> {
1183        let req = Request::new_id(
1184            self.last_id.fetch_add(1, Ordering::SeqCst),
1185            "server.ping",
1186            vec![],
1187        );
1188        self.call(req)?;
1189
1190        Ok(())
1191    }
1192
1193    #[cfg(feature = "debug-calls")]
1194    fn calls_made(&self) -> Result<usize, Error> {
1195        Ok(self.calls.load(Ordering::SeqCst))
1196    }
1197}
1198
1199#[cfg(test)]
1200mod test {
1201    use std::str::FromStr;
1202
1203    use crate::utils;
1204
1205    use super::RawClient;
1206    use crate::api::ElectrumApi;
1207
1208    fn get_test_server() -> String {
1209        std::env::var("TEST_ELECTRUM_SERVER").unwrap_or("electrum.blockstream.info:50001".into())
1210    }
1211
1212    #[test]
1213    fn test_server_features_simple() {
1214        let client = RawClient::new(get_test_server(), None).unwrap();
1215
1216        let resp = client.server_features().unwrap();
1217        assert_eq!(
1218            resp.genesis_hash,
1219            [
1220                0, 0, 0, 0, 0, 25, 214, 104, 156, 8, 90, 225, 101, 131, 30, 147, 79, 247, 99, 174,
1221                70, 162, 166, 193, 114, 179, 241, 182, 10, 140, 226, 111
1222            ],
1223        );
1224        assert_eq!(resp.hash_function, Some("sha256".into()));
1225        assert_eq!(resp.pruning, None);
1226    }
1227
1228    #[test]
1229    #[ignore = "depends on a live server"]
1230    fn test_batch_response_ordering() {
1231        // The electrum.blockstream.info:50001 node always sends back ordered responses which will make this always pass.
1232        // However, many servers do not, so we use one of those servers for this test.
1233        let client = RawClient::new("exs.dyshek.org:50001", None).unwrap();
1234        let heights: Vec<u32> = vec![1, 4, 8, 12, 222, 6666, 12];
1235        let result_times = [
1236            1231469665, 1231470988, 1231472743, 1231474888, 1231770653, 1236456633, 1231474888,
1237        ];
1238        // Check ordering 10 times. This usually fails within 5 if ordering is incorrect.
1239        for _ in 0..10 {
1240            let results = client.batch_block_header(&heights).unwrap();
1241            for (index, result) in results.iter().enumerate() {
1242                assert_eq!(result_times[index], result.time);
1243            }
1244        }
1245    }
1246
1247    #[test]
1248    fn test_relay_fee() {
1249        let client = RawClient::new(get_test_server(), None).unwrap();
1250
1251        let resp = client.relay_fee().unwrap();
1252        assert_eq!(resp, 0.00001);
1253    }
1254
1255    #[test]
1256    fn test_estimate_fee() {
1257        let client = RawClient::new(get_test_server(), None).unwrap();
1258
1259        let resp = client.estimate_fee(10).unwrap();
1260        assert!(resp > 0.0);
1261    }
1262
1263    #[test]
1264    fn test_block_header() {
1265        let client = RawClient::new(get_test_server(), None).unwrap();
1266
1267        let resp = client.block_header(0).unwrap();
1268        assert_eq!(resp.version, bitcoin::block::Version::ONE);
1269        assert_eq!(resp.time, 1231006505);
1270        assert_eq!(resp.nonce, 0x7c2bac1d);
1271    }
1272
1273    #[test]
1274    fn test_block_header_raw() {
1275        let client = RawClient::new(get_test_server(), None).unwrap();
1276
1277        let resp = client.block_header_raw(0).unwrap();
1278        assert_eq!(
1279            resp,
1280            vec![
1281                1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1282                0, 0, 0, 0, 0, 0, 0, 0, 59, 163, 237, 253, 122, 123, 18, 178, 122, 199, 44, 62,
1283                103, 118, 143, 97, 127, 200, 27, 195, 136, 138, 81, 50, 58, 159, 184, 170, 75, 30,
1284                94, 74, 41, 171, 95, 73, 255, 255, 0, 29, 29, 172, 43, 124
1285            ]
1286        );
1287    }
1288
1289    #[test]
1290    fn test_block_headers() {
1291        let client = RawClient::new(get_test_server(), None).unwrap();
1292
1293        let resp = client.block_headers(0, 4).unwrap();
1294        assert_eq!(resp.count, 4);
1295        assert_eq!(resp.max, 2016);
1296        assert_eq!(resp.headers.len(), 4);
1297
1298        assert_eq!(resp.headers[0].time, 1231006505);
1299    }
1300
1301    #[test]
1302    fn test_script_get_balance() {
1303        use std::str::FromStr;
1304
1305        let client = RawClient::new(get_test_server(), None).unwrap();
1306
1307        // Realistically nobody will ever spend from this address, so we can expect the balance to
1308        // increase over time
1309        let addr = bitcoin::Address::from_str("1CounterpartyXXXXXXXXXXXXXXXUWLpVr")
1310            .unwrap()
1311            .assume_checked();
1312        let resp = client.script_get_balance(&addr.script_pubkey()).unwrap();
1313        assert!(resp.confirmed >= 213091301265);
1314    }
1315
1316    #[test]
1317    fn test_script_get_history() {
1318        use std::str::FromStr;
1319
1320        use bitcoin::Txid;
1321
1322        let client = RawClient::new(get_test_server(), None).unwrap();
1323
1324        // Mt.Gox hack address
1325        let addr = bitcoin::Address::from_str("1FeexV6bAHb8ybZjqQMjJrcCrHGW9sb6uF")
1326            .unwrap()
1327            .assume_checked();
1328        let resp = client.script_get_history(&addr.script_pubkey()).unwrap();
1329
1330        assert!(resp.len() >= 328);
1331        assert_eq!(
1332            resp[0].tx_hash,
1333            Txid::from_str("e67a0550848b7932d7796aeea16ab0e48a5cfe81c4e8cca2c5b03e0416850114")
1334                .unwrap()
1335        );
1336    }
1337
1338    #[test]
1339    fn test_script_list_unspent() {
1340        use bitcoin::Txid;
1341        use std::str::FromStr;
1342
1343        let client = RawClient::new(get_test_server(), None).unwrap();
1344
1345        // Peter todd's sha256 bounty address https://bitcointalk.org/index.php?topic=293382.0
1346        let addr = bitcoin::Address::from_str("35Snmmy3uhaer2gTboc81ayCip4m9DT4ko")
1347            .unwrap()
1348            .assume_checked();
1349        let resp = client.script_list_unspent(&addr.script_pubkey()).unwrap();
1350
1351        assert!(resp.len() >= 9);
1352        let txid = "397f12ee15f8a3d2ab25c0f6bb7d3c64d2038ca056af10dd8251b98ae0f076b0";
1353        let txid = Txid::from_str(txid).unwrap();
1354        let txs: Vec<_> = resp.iter().filter(|e| e.tx_hash == txid).collect();
1355        assert_eq!(txs.len(), 1);
1356        assert_eq!(txs[0].value, 10000000);
1357        assert_eq!(txs[0].height, 257674);
1358        assert_eq!(txs[0].tx_pos, 1);
1359    }
1360
1361    #[test]
1362    fn test_batch_script_list_unspent() {
1363        use std::str::FromStr;
1364
1365        let client = RawClient::new(get_test_server(), None).unwrap();
1366
1367        // Peter todd's sha256 bounty address https://bitcointalk.org/index.php?topic=293382.0
1368        let script_1 = bitcoin::Address::from_str("35Snmmy3uhaer2gTboc81ayCip4m9DT4ko")
1369            .unwrap()
1370            .assume_checked()
1371            .script_pubkey();
1372
1373        let resp = client
1374            .batch_script_list_unspent(vec![script_1.as_script()])
1375            .unwrap();
1376        assert_eq!(resp.len(), 1);
1377        assert!(resp[0].len() >= 9);
1378    }
1379
1380    #[test]
1381    fn test_batch_estimate_fee() {
1382        let client = RawClient::new(get_test_server(), None).unwrap();
1383
1384        let resp = client.batch_estimate_fee(vec![10, 20]).unwrap();
1385        assert_eq!(resp.len(), 2);
1386        assert!(resp[0] > 0.0);
1387        assert!(resp[1] > 0.0);
1388    }
1389
1390    #[test]
1391    fn test_transaction_get() {
1392        use bitcoin::{transaction, Txid};
1393
1394        let client = RawClient::new(get_test_server(), None).unwrap();
1395
1396        let resp = client
1397            .transaction_get(
1398                &Txid::from_str("cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566")
1399                    .unwrap(),
1400            )
1401            .unwrap();
1402        assert_eq!(resp.version, transaction::Version::ONE);
1403        assert_eq!(resp.lock_time.to_consensus_u32(), 0);
1404    }
1405
1406    #[test]
1407    fn test_transaction_get_raw() {
1408        use bitcoin::Txid;
1409
1410        let client = RawClient::new(get_test_server(), None).unwrap();
1411
1412        let resp = client
1413            .transaction_get_raw(
1414                &Txid::from_str("cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566")
1415                    .unwrap(),
1416            )
1417            .unwrap();
1418        assert_eq!(
1419            resp,
1420            vec![
1421                1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1422                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 84, 3, 240, 156, 9, 27, 77,
1423                105, 110, 101, 100, 32, 98, 121, 32, 65, 110, 116, 80, 111, 111, 108, 49, 49, 57,
1424                174, 0, 111, 32, 7, 77, 101, 40, 250, 190, 109, 109, 42, 177, 148, 141, 80, 179,
1425                217, 145, 226, 160, 130, 29, 247, 67, 88, 237, 156, 37, 83, 175, 0, 199, 166, 31,
1426                151, 119, 28, 160, 172, 238, 16, 110, 4, 0, 0, 0, 0, 0, 0, 0, 203, 236, 0, 128, 36,
1427                97, 249, 5, 255, 255, 255, 255, 3, 84, 206, 172, 42, 0, 0, 0, 0, 25, 118, 169, 20,
1428                17, 219, 228, 140, 198, 182, 23, 249, 198, 173, 175, 77, 158, 213, 246, 37, 177,
1429                199, 203, 89, 136, 172, 0, 0, 0, 0, 0, 0, 0, 0, 38, 106, 36, 170, 33, 169, 237, 46,
1430                87, 139, 206, 44, 166, 198, 188, 147, 89, 55, 115, 69, 216, 233, 133, 221, 95, 144,
1431                199, 132, 33, 255, 166, 239, 165, 235, 96, 66, 142, 105, 140, 0, 0, 0, 0, 0, 0, 0,
1432                0, 38, 106, 36, 185, 225, 27, 109, 47, 98, 29, 126, 195, 244, 90, 94, 202, 137,
1433                211, 234, 106, 41, 76, 223, 58, 4, 46, 151, 48, 9, 88, 68, 112, 161, 41, 22, 17,
1434                30, 44, 170, 1, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1435                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
1436            ]
1437        )
1438    }
1439
1440    #[test]
1441    fn test_transaction_get_merkle() {
1442        use bitcoin::Txid;
1443
1444        let client = RawClient::new(get_test_server(), None).unwrap();
1445
1446        let txid =
1447            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1448                .unwrap();
1449        let resp = client.transaction_get_merkle(&txid, 630000).unwrap();
1450        assert_eq!(resp.block_height, 630000);
1451        assert_eq!(resp.pos, 68);
1452        assert_eq!(resp.merkle.len(), 12);
1453        assert_eq!(
1454            resp.merkle[0],
1455            [
1456                34, 65, 51, 64, 49, 139, 115, 189, 185, 246, 70, 225, 168, 193, 217, 195, 47, 66,
1457                179, 240, 153, 24, 114, 215, 144, 196, 212, 41, 39, 155, 246, 25
1458            ]
1459        );
1460
1461        // Check we can verify the merkle proof validity, but fail if we supply wrong data.
1462        let block_header = client.block_header(resp.block_height).unwrap();
1463        assert!(utils::validate_merkle_proof(
1464            &txid,
1465            &block_header.merkle_root,
1466            &resp
1467        ));
1468
1469        let mut fail_resp = resp.clone();
1470        fail_resp.pos = 13;
1471        assert!(!utils::validate_merkle_proof(
1472            &txid,
1473            &block_header.merkle_root,
1474            &fail_resp
1475        ));
1476
1477        let fail_block_header = client.block_header(resp.block_height + 1).unwrap();
1478        assert!(!utils::validate_merkle_proof(
1479            &txid,
1480            &fail_block_header.merkle_root,
1481            &resp
1482        ));
1483    }
1484
1485    #[test]
1486    fn test_batch_transaction_get_merkle() {
1487        use bitcoin::Txid;
1488
1489        struct TestCase {
1490            txid: Txid,
1491            block_height: usize,
1492            exp_pos: usize,
1493            exp_bytes: [u8; 32],
1494        }
1495
1496        let client = RawClient::new(get_test_server(), None).unwrap();
1497
1498        let test_cases: Vec<TestCase> = vec![
1499            TestCase {
1500                txid: Txid::from_str(
1501                    "1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d",
1502                )
1503                .unwrap(),
1504                block_height: 630000,
1505                exp_pos: 68,
1506                exp_bytes: [
1507                    34, 65, 51, 64, 49, 139, 115, 189, 185, 246, 70, 225, 168, 193, 217, 195, 47,
1508                    66, 179, 240, 153, 24, 114, 215, 144, 196, 212, 41, 39, 155, 246, 25,
1509                ],
1510            },
1511            TestCase {
1512                txid: Txid::from_str(
1513                    "70a8639bc9b743c0610d1231103a2f8e99f4a25670946b91f16c55a5373b37d1",
1514                )
1515                .unwrap(),
1516                block_height: 630001,
1517                exp_pos: 25,
1518                exp_bytes: [
1519                    169, 100, 34, 99, 168, 101, 25, 168, 184, 90, 77, 50, 151, 245, 130, 101, 193,
1520                    229, 136, 128, 63, 110, 241, 19, 242, 59, 184, 137, 245, 249, 188, 110,
1521                ],
1522            },
1523            TestCase {
1524                txid: Txid::from_str(
1525                    "a0db149ace545beabbd87a8d6b20ffd6aa3b5a50e58add49a3d435f898c272cf",
1526                )
1527                .unwrap(),
1528                block_height: 840000,
1529                exp_pos: 0,
1530                exp_bytes: [
1531                    43, 184, 95, 75, 0, 75, 230, 218, 84, 247, 102, 193, 124, 30, 133, 81, 135, 50,
1532                    113, 18, 194, 49, 239, 47, 243, 94, 186, 208, 234, 103, 198, 158,
1533                ],
1534            },
1535        ];
1536
1537        let txids_and_heights: Vec<(Txid, usize)> = test_cases
1538            .iter()
1539            .map(|case| (case.txid, case.block_height))
1540            .collect();
1541
1542        let resp = client
1543            .batch_transaction_get_merkle(&txids_and_heights)
1544            .unwrap();
1545
1546        for (i, (res, test_case)) in resp.iter().zip(test_cases).enumerate() {
1547            assert_eq!(res.block_height, test_case.block_height);
1548            assert_eq!(res.pos, test_case.exp_pos);
1549            assert_eq!(res.merkle.len(), 12);
1550            assert_eq!(res.merkle[0], test_case.exp_bytes);
1551
1552            // Check we can verify the merkle proof validity, but fail if we supply wrong data.
1553            let block_header = client.block_header(res.block_height).unwrap();
1554            assert!(utils::validate_merkle_proof(
1555                &txids_and_heights[i].0,
1556                &block_header.merkle_root,
1557                res
1558            ));
1559
1560            let mut fail_res = res.clone();
1561            fail_res.pos = 13;
1562            assert!(!utils::validate_merkle_proof(
1563                &txids_and_heights[i].0,
1564                &block_header.merkle_root,
1565                &fail_res
1566            ));
1567
1568            let fail_block_header = client.block_header(res.block_height + 1).unwrap();
1569            assert!(!utils::validate_merkle_proof(
1570                &txids_and_heights[i].0,
1571                &fail_block_header.merkle_root,
1572                res
1573            ));
1574        }
1575    }
1576
1577    #[test]
1578    fn test_txid_from_pos() {
1579        use bitcoin::Txid;
1580
1581        let client = RawClient::new(get_test_server(), None).unwrap();
1582
1583        let txid =
1584            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1585                .unwrap();
1586        let resp = client.txid_from_pos(630000, 68).unwrap();
1587        assert_eq!(resp, txid);
1588    }
1589
1590    #[test]
1591    fn test_txid_from_pos_with_merkle() {
1592        use bitcoin::Txid;
1593
1594        let client = RawClient::new(get_test_server(), None).unwrap();
1595
1596        let txid =
1597            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1598                .unwrap();
1599        let resp = client.txid_from_pos_with_merkle(630000, 68).unwrap();
1600        assert_eq!(resp.tx_hash, txid);
1601        assert_eq!(
1602            resp.merkle[0],
1603            [
1604                34, 65, 51, 64, 49, 139, 115, 189, 185, 246, 70, 225, 168, 193, 217, 195, 47, 66,
1605                179, 240, 153, 24, 114, 215, 144, 196, 212, 41, 39, 155, 246, 25
1606            ]
1607        );
1608    }
1609
1610    #[test]
1611    fn test_ping() {
1612        let client = RawClient::new(get_test_server(), None).unwrap();
1613        client.ping().unwrap();
1614    }
1615
1616    #[test]
1617    fn test_block_headers_subscribe() {
1618        let client = RawClient::new(get_test_server(), None).unwrap();
1619        let resp = client.block_headers_subscribe().unwrap();
1620
1621        assert!(resp.height >= 639000);
1622    }
1623
1624    #[test]
1625    fn test_script_subscribe() {
1626        use std::str::FromStr;
1627
1628        let client = RawClient::new(get_test_server(), None).unwrap();
1629
1630        // Mt.Gox hack address
1631        let addr = bitcoin::Address::from_str("1FeexV6bAHb8ybZjqQMjJrcCrHGW9sb6uF")
1632            .unwrap()
1633            .assume_checked();
1634
1635        // Just make sure that the call returns Ok(something)
1636        client.script_subscribe(&addr.script_pubkey()).unwrap();
1637    }
1638
1639    #[test]
1640    fn test_request_after_error() {
1641        let client = RawClient::new(get_test_server(), None).unwrap();
1642
1643        assert!(client.transaction_broadcast_raw(&[0x00]).is_err());
1644        assert!(client.server_features().is_ok());
1645    }
1646
1647    #[test]
1648    fn test_raw_call() {
1649        use crate::types::Param;
1650
1651        let client = RawClient::new(get_test_server(), None).unwrap();
1652
1653        let params = vec![
1654            Param::String(
1655                "cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566".to_string(),
1656            ),
1657            Param::Bool(false),
1658        ];
1659
1660        let resp = client
1661            .raw_call("blockchain.transaction.get", params)
1662            .unwrap();
1663
1664        assert_eq!(
1665            resp,
1666            "01000000000101000000000000000000000000000000000000000000000000000\
1667            0000000000000ffffffff5403f09c091b4d696e656420627920416e74506f6f6c3\
1668            13139ae006f20074d6528fabe6d6d2ab1948d50b3d991e2a0821df74358ed9c255\
1669            3af00c7a61f97771ca0acee106e0400000000000000cbec00802461f905fffffff\
1670            f0354ceac2a000000001976a91411dbe48cc6b617f9c6adaf4d9ed5f625b1c7cb5\
1671            988ac0000000000000000266a24aa21a9ed2e578bce2ca6c6bc9359377345d8e98\
1672            5dd5f90c78421ffa6efa5eb60428e698c0000000000000000266a24b9e11b6d2f6\
1673            21d7ec3f45a5eca89d3ea6a294cdf3a042e973009584470a12916111e2caa01200\
1674            000000000000000000000000000000000000000000000000000000000000000000\
1675            00000"
1676        )
1677    }
1678}