electrum/
raw_client.rs

1//! Raw client
2//!
3//! This module contains the definition of the raw client that wraps the transport method
4
5use amplify::hex::{FromHex, ToHex};
6use bp::{BlockHash, BlockHeader, ConsensusDecode, ScriptPubkey, Tx, Txid};
7use std::borrow::Borrow;
8use std::collections::{BTreeMap, HashMap, VecDeque};
9use std::io::{BufRead, BufReader, Read, Write};
10use std::mem::drop;
11use std::net::{TcpStream, ToSocketAddrs};
12use std::sync::atomic::{AtomicUsize, Ordering};
13use std::sync::mpsc::{channel, Receiver, Sender};
14use std::sync::{Arc, Mutex, TryLockError};
15use std::time::Duration;
16
17#[allow(unused_imports)]
18use log::{debug, error, info, trace, warn};
19
20#[cfg(feature = "use-openssl")]
21use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode};
22
23#[cfg(all(
24    any(
25        feature = "default",
26        feature = "use-rustls",
27        feature = "use-rustls-ring"
28    ),
29    not(feature = "use-openssl")
30))]
31use rustls::{pki_types::ServerName, ClientConfig, ClientConnection, RootCertStore, StreamOwned};
32
33#[cfg(any(feature = "default", feature = "proxy"))]
34use crate::socks::{Socks5Stream, TargetAddr, ToTargetAddr};
35
36use crate::stream::ClonableStream;
37
38use crate::api::ElectrumApi;
39use crate::batch::Batch;
40use crate::types::*;
41
42macro_rules! impl_batch_call {
43    ( $self:expr, $data:expr, $call:ident ) => {{
44        impl_batch_call!($self, $data, $call, )
45    }};
46
47    ( $self:expr, $data:expr, $call:ident, apply_deref ) => {{
48        impl_batch_call!($self, $data, $call, *)
49    }};
50
51    ( $self:expr, $data:expr, $call:ident, $($apply_deref:tt)? ) => {{
52        let mut batch = Batch::default();
53        for i in $data {
54            batch.$call($($apply_deref)* i.borrow());
55        }
56
57        let resp = $self.batch_call(&batch)?;
58        let mut answer = Vec::new();
59
60        for x in resp {
61            answer.push(serde_json::from_value(x)?);
62        }
63
64        Ok(answer)
65    }};
66}
67
68/// A trait for [`ToSocketAddrs`](https://doc.rust-lang.org/std/net/trait.ToSocketAddrs.html) that
69/// can also be turned into a domain. Used when an SSL client needs to validate the server's
70/// certificate.
71pub trait ToSocketAddrsDomain: ToSocketAddrs {
72    /// Returns the domain, if present
73    fn domain(&self) -> Option<&str> {
74        None
75    }
76}
77
78impl ToSocketAddrsDomain for &str {
79    fn domain(&self) -> Option<&str> {
80        self.split(':').next()
81    }
82}
83
84impl ToSocketAddrsDomain for (&str, u16) {
85    fn domain(&self) -> Option<&str> {
86        self.0.domain()
87    }
88}
89
90#[cfg(any(feature = "default", feature = "proxy"))]
91impl ToSocketAddrsDomain for TargetAddr {
92    fn domain(&self) -> Option<&str> {
93        match self {
94            TargetAddr::Ip(_) => None,
95            TargetAddr::Domain(domain, _) => Some(domain.as_str()),
96        }
97    }
98}
99
100macro_rules! impl_to_socket_addrs_domain {
101    ( $ty:ty ) => {
102        impl ToSocketAddrsDomain for $ty {}
103    };
104}
105
106impl_to_socket_addrs_domain!(std::net::SocketAddr);
107impl_to_socket_addrs_domain!(std::net::SocketAddrV4);
108impl_to_socket_addrs_domain!(std::net::SocketAddrV6);
109impl_to_socket_addrs_domain!((std::net::IpAddr, u16));
110impl_to_socket_addrs_domain!((std::net::Ipv4Addr, u16));
111impl_to_socket_addrs_domain!((std::net::Ipv6Addr, u16));
112
113/// Instance of an Electrum client
114///
115/// A `Client` maintains a constant connection with an Electrum server and exposes methods to
116/// interact with it. It can also subscribe and receive notifications from the server about new
117/// blocks or activity on a specific *scriptPubKey*.
118///
119/// The `Client` is modeled in such a way that allows the external caller to have full control over
120/// its functionality: no threads or tasks are spawned internally to monitor the state of the
121/// connection.
122///
123/// More transport methods can be used by manually creating an instance of this struct with an
124/// arbitrary `S` type.
125#[derive(Debug)]
126pub struct RawClient<S>
127where
128    S: Read + Write,
129{
130    stream: Mutex<ClonableStream<S>>,
131    buf_reader: Mutex<BufReader<ClonableStream<S>>>,
132
133    last_id: AtomicUsize,
134    waiting_map: Mutex<HashMap<usize, Sender<ChannelMessage>>>,
135
136    headers: Mutex<VecDeque<RawHeaderNotification>>,
137    script_notifications: Mutex<HashMap<ScriptHash, VecDeque<ScriptStatus>>>,
138
139    #[cfg(feature = "debug-calls")]
140    calls: AtomicUsize,
141}
142
143impl<S> From<S> for RawClient<S>
144where
145    S: Read + Write,
146{
147    fn from(stream: S) -> Self {
148        let stream: ClonableStream<_> = stream.into();
149
150        Self {
151            buf_reader: Mutex::new(BufReader::new(stream.clone())),
152            stream: Mutex::new(stream),
153
154            last_id: AtomicUsize::new(0),
155            waiting_map: Mutex::new(HashMap::new()),
156
157            headers: Mutex::new(VecDeque::new()),
158            script_notifications: Mutex::new(HashMap::new()),
159
160            #[cfg(feature = "debug-calls")]
161            calls: AtomicUsize::new(0),
162        }
163    }
164}
165
166/// Transport type used to establish a plaintext TCP connection with the server
167pub type ElectrumPlaintextStream = TcpStream;
168impl RawClient<ElectrumPlaintextStream> {
169    /// Creates a new plaintext client and tries to connect to `socket_addr`.
170    pub fn new<A: ToSocketAddrs>(
171        socket_addrs: A,
172        timeout: Option<Duration>,
173    ) -> Result<Self, Error> {
174        let stream = match timeout {
175            Some(timeout) => {
176                let stream = connect_with_total_timeout(socket_addrs, timeout)?;
177                stream.set_read_timeout(Some(timeout))?;
178                stream.set_write_timeout(Some(timeout))?;
179                stream
180            }
181            None => TcpStream::connect(socket_addrs)?,
182        };
183
184        Ok(stream.into())
185    }
186}
187
188fn connect_with_total_timeout<A: ToSocketAddrs>(
189    socket_addrs: A,
190    mut timeout: Duration,
191) -> Result<TcpStream, Error> {
192    // Use the same algorithm as curl: 1/2 on the first host, 1/4 on the second one, etc.
193    // https://curl.se/mail/lib-2014-11/0164.html
194
195    let mut errors = Vec::new();
196
197    let addrs = socket_addrs
198        .to_socket_addrs()?
199        .enumerate()
200        .collect::<Vec<_>>();
201    for (index, addr) in &addrs {
202        if *index < addrs.len() - 1 {
203            timeout = timeout.div_f32(2.0);
204        }
205
206        info!(
207            "Trying to connect to {} (attempt {}/{}) with timeout {:?}",
208            addr,
209            index + 1,
210            addrs.len(),
211            timeout
212        );
213        match TcpStream::connect_timeout(addr, timeout) {
214            Ok(socket) => return Ok(socket),
215            Err(e) => {
216                warn!("Connection error: {:?}", e);
217                errors.push(e.into());
218            }
219        }
220    }
221
222    Err(Error::AllAttemptsErrored(errors))
223}
224
225#[cfg(feature = "use-openssl")]
226/// Transport type used to establish an OpenSSL TLS encrypted/authenticated connection with the server
227pub type ElectrumSslStream = SslStream<TcpStream>;
228#[cfg(feature = "use-openssl")]
229impl RawClient<ElectrumSslStream> {
230    /// Creates a new SSL client and tries to connect to `socket_addr`. Optionally, if
231    /// `validate_domain` is `true`, validate the server's certificate.
232    pub fn new_ssl<A: ToSocketAddrsDomain + Clone>(
233        socket_addrs: A,
234        validate_domain: bool,
235        timeout: Option<Duration>,
236    ) -> Result<Self, Error> {
237        debug!(
238            "new_ssl socket_addrs.domain():{:?} validate_domain:{} timeout:{:?}",
239            socket_addrs.domain(),
240            validate_domain,
241            timeout
242        );
243        if validate_domain {
244            socket_addrs.domain().ok_or(Error::MissingDomain)?;
245        }
246        match timeout {
247            Some(timeout) => {
248                let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
249                stream.set_read_timeout(Some(timeout))?;
250                stream.set_write_timeout(Some(timeout))?;
251                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
252            }
253            None => {
254                let stream = TcpStream::connect(socket_addrs.clone())?;
255                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
256            }
257        }
258    }
259
260    /// Create a new SSL client using an existing TcpStream
261    pub fn new_ssl_from_stream<A: ToSocketAddrsDomain>(
262        socket_addrs: A,
263        validate_domain: bool,
264        stream: TcpStream,
265    ) -> Result<Self, Error> {
266        let mut builder =
267            SslConnector::builder(SslMethod::tls()).map_err(Error::InvalidSslMethod)?;
268        // TODO: support for certificate pinning
269        if validate_domain {
270            socket_addrs.domain().ok_or(Error::MissingDomain)?;
271        } else {
272            builder.set_verify(SslVerifyMode::NONE);
273        }
274        let connector = builder.build();
275
276        let domain = socket_addrs.domain().unwrap_or("NONE").to_string();
277
278        let stream = connector
279            .connect(&domain, stream)
280            .map_err(Error::SslHandshakeError)?;
281
282        Ok(stream.into())
283    }
284}
285
286#[cfg(all(
287    any(
288        feature = "default",
289        feature = "use-rustls",
290        feature = "use-rustls-ring"
291    ),
292    not(feature = "use-openssl")
293))]
294mod danger {
295    use crate::raw_client::ServerName;
296    use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified};
297    use rustls::crypto::CryptoProvider;
298    use rustls::pki_types::{CertificateDer, UnixTime};
299    use rustls::DigitallySignedStruct;
300
301    #[derive(Debug)]
302    pub struct NoCertificateVerification(CryptoProvider);
303
304    impl NoCertificateVerification {
305        pub fn new(provider: CryptoProvider) -> Self {
306            Self(provider)
307        }
308    }
309
310    impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
311        fn verify_server_cert(
312            &self,
313            _end_entity: &CertificateDer<'_>,
314            _intermediates: &[CertificateDer<'_>],
315            _server_name: &ServerName<'_>,
316            _ocsp: &[u8],
317            _now: UnixTime,
318        ) -> Result<ServerCertVerified, rustls::Error> {
319            Ok(ServerCertVerified::assertion())
320        }
321
322        fn verify_tls12_signature(
323            &self,
324            _message: &[u8],
325            _cert: &CertificateDer<'_>,
326            _dss: &DigitallySignedStruct,
327        ) -> Result<HandshakeSignatureValid, rustls::Error> {
328            Ok(HandshakeSignatureValid::assertion())
329        }
330
331        fn verify_tls13_signature(
332            &self,
333            _message: &[u8],
334            _cert: &CertificateDer<'_>,
335            _dss: &DigitallySignedStruct,
336        ) -> Result<HandshakeSignatureValid, rustls::Error> {
337            Ok(HandshakeSignatureValid::assertion())
338        }
339
340        fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
341            self.0.signature_verification_algorithms.supported_schemes()
342        }
343    }
344}
345
346#[cfg(all(
347    any(
348        feature = "default",
349        feature = "use-rustls",
350        feature = "use-rustls-ring"
351    ),
352    not(feature = "use-openssl")
353))]
354/// Transport type used to establish a Rustls TLS encrypted/authenticated connection with the server
355pub type ElectrumSslStream = StreamOwned<ClientConnection, TcpStream>;
356#[cfg(all(
357    any(
358        feature = "default",
359        feature = "use-rustls",
360        feature = "use-rustls-ring"
361    ),
362    not(feature = "use-openssl")
363))]
364impl RawClient<ElectrumSslStream> {
365    /// Creates a new SSL client and tries to connect to `socket_addr`. Optionally, if
366    /// `validate_domain` is `true`, validate the server's certificate.
367    pub fn new_ssl<A: ToSocketAddrsDomain + Clone>(
368        socket_addrs: A,
369        validate_domain: bool,
370        timeout: Option<Duration>,
371    ) -> Result<Self, Error> {
372        debug!(
373            "new_ssl socket_addrs.domain():{:?} validate_domain:{} timeout:{:?}",
374            socket_addrs.domain(),
375            validate_domain,
376            timeout
377        );
378        if validate_domain {
379            socket_addrs.domain().ok_or(Error::MissingDomain)?;
380        }
381        match timeout {
382            Some(timeout) => {
383                let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
384                stream.set_read_timeout(Some(timeout))?;
385                stream.set_write_timeout(Some(timeout))?;
386                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
387            }
388            None => {
389                let stream = TcpStream::connect(socket_addrs.clone())?;
390                Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
391            }
392        }
393    }
394
395    /// Create a new SSL client using an existing TcpStream
396    pub fn new_ssl_from_stream<A: ToSocketAddrsDomain>(
397        socket_addr: A,
398        validate_domain: bool,
399        tcp_stream: TcpStream,
400    ) -> Result<Self, Error> {
401        #[cfg(feature = "use-rustls")]
402        let _ = rustls::crypto::aws_lc_rs::default_provider().install_default();
403        let builder = ClientConfig::builder();
404
405        let config = if validate_domain {
406            socket_addr.domain().ok_or(Error::MissingDomain)?;
407
408            let store = webpki_roots::TLS_SERVER_ROOTS
409                .iter()
410                .cloned()
411                .collect::<RootCertStore>();
412
413            // TODO: cert pinning
414            builder.with_root_certificates(store).with_no_client_auth()
415        } else {
416            builder
417                .dangerous()
418                .with_custom_certificate_verifier(Arc::new(
419                    #[cfg(feature = "use-rustls")]
420                    danger::NoCertificateVerification::new(rustls::crypto::aws_lc_rs::default_provider()),
421                    #[cfg(feature = "use-rustls-ring")]
422                    danger::NoCertificateVerification::new(rustls::crypto::ring::default_provider()),
423                ))
424                .with_no_client_auth()
425        };
426
427        let domain = socket_addr.domain().unwrap_or("NONE").to_string();
428        let session = ClientConnection::new(
429            std::sync::Arc::new(config),
430            ServerName::try_from(domain.clone())
431                .map_err(|_| Error::InvalidDNSNameError(domain.clone()))?,
432        )
433        .map_err(Error::CouldNotCreateConnection)?;
434        let stream = StreamOwned::new(session, tcp_stream);
435
436        Ok(stream.into())
437    }
438}
439
440#[cfg(any(feature = "default", feature = "proxy"))]
441/// Transport type used to establish a connection to a server through a socks proxy
442pub type ElectrumProxyStream = Socks5Stream;
443#[cfg(any(feature = "default", feature = "proxy"))]
444impl RawClient<ElectrumProxyStream> {
445    /// Creates a new socks client and tries to connect to `target_addr` using `proxy_addr` as a
446    /// socks proxy server. The DNS resolution of `target_addr`, if required, is done
447    /// through the proxy. This allows to specify, for instance, `.onion` addresses.
448    pub fn new_proxy<T: ToTargetAddr>(
449        target_addr: T,
450        proxy: &crate::Socks5Config,
451        timeout: Option<Duration>,
452    ) -> Result<Self, Error> {
453        let mut stream = match proxy.credentials.as_ref() {
454            Some(cred) => Socks5Stream::connect_with_password(
455                &proxy.addr,
456                target_addr,
457                &cred.username,
458                &cred.password,
459                timeout,
460            )?,
461            None => Socks5Stream::connect(&proxy.addr, target_addr, timeout)?,
462        };
463        stream.get_mut().set_read_timeout(timeout)?;
464        stream.get_mut().set_write_timeout(timeout)?;
465
466        Ok(stream.into())
467    }
468
469    #[cfg(any(
470        feature = "use-openssl",
471        feature = "use-rustls",
472        feature = "use-rustls-ring"
473    ))]
474    /// Creates a new TLS client that connects to `target_addr` using `proxy_addr` as a socks proxy
475    /// server. The DNS resolution of `target_addr`, if required, is done through the proxy. This
476    /// allows to specify, for instance, `.onion` addresses.
477    pub fn new_proxy_ssl<T: ToTargetAddr>(
478        target_addr: T,
479        validate_domain: bool,
480        proxy: &crate::Socks5Config,
481        timeout: Option<Duration>,
482    ) -> Result<RawClient<ElectrumSslStream>, Error> {
483        let target = target_addr.to_target_addr()?;
484
485        let mut stream = match proxy.credentials.as_ref() {
486            Some(cred) => Socks5Stream::connect_with_password(
487                &proxy.addr,
488                target_addr,
489                &cred.username,
490                &cred.password,
491                timeout,
492            )?,
493            None => Socks5Stream::connect(&proxy.addr, target.clone(), timeout)?,
494        };
495        stream.get_mut().set_read_timeout(timeout)?;
496        stream.get_mut().set_write_timeout(timeout)?;
497
498        RawClient::new_ssl_from_stream(target, validate_domain, stream.into_inner())
499    }
500}
501
502#[derive(Debug)]
503enum ChannelMessage {
504    Response(serde_json::Value),
505    WakeUp,
506    Error(Arc<std::io::Error>),
507}
508
509impl<S: Read + Write> RawClient<S> {
510    // TODO: to enable this we have to find a way to allow concurrent read and writes to the
511    // underlying transport struct. This can be done pretty easily for TcpStream because it can be
512    // split into a "read" and a "write" object, but it's not as trivial for other types. Without
513    // such thing, this causes a deadlock, because the reader thread takes a lock on the
514    // `ClonableStream` before other threads can send a request to the server. They will block
515    // waiting for the reader to release the mutex, but this will never happen because the server
516    // didn't receive any request, so it has nothing to send back.
517    // pub fn reader_thread(&self) -> Result<(), Error> {
518    //     self._reader_thread(None).map(|_| ())
519    // }
520
521    fn _reader_thread(&self, until_message: Option<usize>) -> Result<serde_json::Value, Error> {
522        let mut raw_resp = String::new();
523        let resp = match self.buf_reader.try_lock() {
524            Ok(mut reader) => {
525                trace!(
526                    "Starting reader thread with `until_message` = {:?}",
527                    until_message
528                );
529
530                if let Some(until_message) = until_message {
531                    // If we are trying to start a reader thread but the corresponding sender is
532                    // missing from the map, exit immediately. We might have already received a
533                    // response for that id, but we don't know it yet. Exiting here forces the
534                    // calling code to fallback to the sender-receiver method, and it should find
535                    // a message there waiting for it.
536                    if self.waiting_map.lock()?.get(&until_message).is_none() {
537                        return Err(Error::CouldntLockReader);
538                    }
539                }
540
541                // Loop over every message
542                loop {
543                    raw_resp.clear();
544
545                    if let Err(e) = reader.read_line(&mut raw_resp) {
546                        let error = Arc::new(e);
547                        for (_, s) in self.waiting_map.lock().unwrap().drain() {
548                            s.send(ChannelMessage::Error(error.clone()))?;
549                        }
550                        return Err(Error::SharedIOError(error));
551                    }
552                    trace!("<== {}", raw_resp);
553
554                    let resp: serde_json::Value = serde_json::from_str(&raw_resp)?;
555
556                    // Normally there is and id, but it's missing for spontaneous notifications
557                    // from the server
558                    let resp_id = resp["id"]
559                        .as_str()
560                        .and_then(|s| s.parse().ok())
561                        .or_else(|| resp["id"].as_u64().map(|i| i as usize));
562                    match resp_id {
563                        Some(resp_id) if until_message == Some(resp_id) => {
564                            // We have a valid id and it's exactly the one we were waiting for!
565                            trace!(
566                                "Reader thread {} received a response for its request",
567                                resp_id
568                            );
569
570                            // Remove ourselves from the "waiting map"
571                            let mut map = self.waiting_map.lock()?;
572                            map.remove(&resp_id);
573
574                            // If the map is not empty, we select a random thread to become the
575                            // new reader thread.
576                            if let Some(err) = map.values().find_map(|sender| {
577                                sender
578                                    .send(ChannelMessage::WakeUp)
579                                    .inspect_err(|_| {
580                                        warn!("Unable to wake up a thread, trying some other");
581                                    })
582                                    .err()
583                            }) {
584                                error!("All the threads has failed, giving up");
585                                return Err(err)?;
586                            }
587
588                            break Ok(resp);
589                        }
590                        Some(resp_id) => {
591                            // We have an id, but it's not our response. Notify the thread and
592                            // move on
593                            trace!("Reader thread received response for {}", resp_id);
594
595                            if let Some(sender) = self.waiting_map.lock()?.remove(&resp_id) {
596                                sender.send(ChannelMessage::Response(resp))?;
597                            } else {
598                                warn!("Missing listener for {}", resp_id);
599                            }
600                        }
601                        None => {
602                            // No id, that's probably a notification.
603                            let mut resp = resp;
604
605                            if let Some(method) = resp["method"].take().as_str() {
606                                self.handle_notification(method, resp["params"].take())?;
607                            } else {
608                                warn!("Unexpected response: {:?}", resp);
609                            }
610                        }
611                    }
612                }
613            }
614            Err(TryLockError::WouldBlock) => {
615                // If we "WouldBlock" here it means that there's already a reader thread
616                // running somewhere.
617                Err(Error::CouldntLockReader)
618            }
619            Err(TryLockError::Poisoned(e)) => Err(e)?,
620        };
621
622        let resp = resp?;
623        if let Some(err) = resp.get("error") {
624            let err = serde_json::from_value(err.clone())
625                .map_err(|_| Error::InvalidResponse(resp.clone()))?;
626            Err(Error::Protocol(err))
627        } else {
628            Ok(resp)
629        }
630    }
631
632    fn call(&self, req: Request) -> Result<serde_json::Value, Error> {
633        // Add our listener to the map before we send the request, to make sure we don't get a
634        // reply before the receiver is added
635        let (sender, receiver) = channel();
636        self.waiting_map.lock()?.insert(req.id, sender);
637
638        let mut raw = serde_json::to_vec(&req)?;
639        trace!("==> {}", String::from_utf8_lossy(&raw));
640
641        raw.extend_from_slice(b"\n");
642        let mut stream = self.stream.lock()?;
643        stream.write_all(&raw)?;
644        stream.flush()?;
645        drop(stream); // release the lock
646
647        self.increment_calls();
648
649        let mut resp = match self.recv(&receiver, req.id) {
650            Ok(resp) => resp,
651            e @ Err(_) => {
652                // In case of error our sender could still be left in the map, depending on where
653                // the error happened. Just in case, try to remove it here
654                self.waiting_map.lock()?.remove(&req.id);
655                return e;
656            }
657        };
658        Ok(resp["result"].take())
659    }
660
661    fn recv(
662        &self,
663        receiver: &Receiver<ChannelMessage>,
664        req_id: usize,
665    ) -> Result<serde_json::Value, Error> {
666        loop {
667            // Try to take the lock on the reader. If we manage to do so, we'll become the reader
668            // thread until we get our response
669            match self._reader_thread(Some(req_id)) {
670                Ok(response) => break Ok(response),
671                Err(Error::CouldntLockReader) => {
672                    match receiver.recv()? {
673                        // Received our response, returning it
674                        ChannelMessage::Response(received) => break Ok(received),
675                        ChannelMessage::WakeUp => {
676                            // We have been woken up, this means that we should try becoming the
677                            // reader thread ourselves
678                            trace!("WakeUp for {}", req_id);
679
680                            continue;
681                        }
682                        ChannelMessage::Error(e) => {
683                            warn!("Received ChannelMessage::Error");
684
685                            break Err(Error::SharedIOError(e));
686                        }
687                    }
688                }
689                e @ Err(_) => break e,
690            }
691        }
692    }
693
694    fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> {
695        match method {
696            "blockchain.headers.subscribe" => self.headers.lock()?.append(
697                &mut serde_json::from_value::<Vec<RawHeaderNotification>>(result)?
698                    .into_iter()
699                    .collect(),
700            ),
701            "blockchain.scripthash.subscribe" => {
702                let unserialized: ScriptNotification = serde_json::from_value(result)?;
703                let mut script_notifications = self.script_notifications.lock()?;
704
705                let queue = script_notifications
706                    .get_mut(&unserialized.scripthash)
707                    .ok_or(Error::NotSubscribed(unserialized.scripthash))?;
708
709                queue.push_back(unserialized.status);
710            }
711            _ => info!("received unknown notification for method `{}`", method),
712        }
713
714        Ok(())
715    }
716
717    pub(crate) fn internal_raw_call_with_vec(
718        &self,
719        method_name: &str,
720        params: Vec<Param>,
721    ) -> Result<serde_json::Value, Error> {
722        let req = Request::new_id(
723            self.last_id.fetch_add(1, Ordering::SeqCst),
724            method_name,
725            params,
726        );
727        let result = self.call(req)?;
728
729        Ok(result)
730    }
731
732    #[inline]
733    #[cfg(feature = "debug-calls")]
734    fn increment_calls(&self) {
735        self.calls.fetch_add(1, Ordering::SeqCst);
736    }
737
738    #[inline]
739    #[cfg(not(feature = "debug-calls"))]
740    fn increment_calls(&self) {}
741}
742
743impl<T: Read + Write> ElectrumApi for RawClient<T> {
744    fn raw_call(
745        &self,
746        method_name: &str,
747        params: impl IntoIterator<Item = Param>,
748    ) -> Result<serde_json::Value, Error> {
749        self.internal_raw_call_with_vec(method_name, params.into_iter().collect())
750    }
751
752    fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
753        let mut raw = Vec::new();
754
755        let mut missing_responses = Vec::new();
756        let mut answers = BTreeMap::new();
757
758        // Add our listener to the map before we send the request
759
760        for (method, params) in batch.iter() {
761            let req = Request::new_id(
762                self.last_id.fetch_add(1, Ordering::SeqCst),
763                method,
764                params.to_vec(),
765            );
766            // Add distinct channel to each request so when we remove our request id (and sender) from the waiting_map
767            // we can be sure that the response gets sent to the correct channel in self.recv
768            let (sender, receiver) = channel();
769            missing_responses.push((req.id, receiver));
770
771            self.waiting_map.lock()?.insert(req.id, sender);
772
773            raw.append(&mut serde_json::to_vec(&req)?);
774            raw.extend_from_slice(b"\n");
775        }
776
777        if missing_responses.is_empty() {
778            return Ok(vec![]);
779        }
780
781        trace!("==> {}", String::from_utf8_lossy(&raw));
782
783        let mut stream = self.stream.lock()?;
784        stream.write_all(&raw)?;
785        stream.flush()?;
786        drop(stream); // release the lock
787
788        self.increment_calls();
789
790        for (req_id, receiver) in missing_responses.iter() {
791            match self.recv(receiver, *req_id) {
792                Ok(mut resp) => answers.insert(req_id, resp["result"].take()),
793                Err(e) => {
794                    // In case of error our sender could still be left in the map, depending on where
795                    // the error happened. Just in case, try to remove it here
796                    warn!("got error for req_id {}: {:?}", req_id, e);
797                    warn!("removing all waiting req of this batch");
798                    let mut guard = self.waiting_map.lock()?;
799                    for (req_id, _) in missing_responses.iter() {
800                        guard.remove(req_id);
801                    }
802                    return Err(e);
803                }
804            };
805        }
806
807        Ok(answers.into_values().collect())
808    }
809
810    fn block_headers_subscribe_raw(&self) -> Result<RawHeaderNotification, Error> {
811        let req = Request::new_id(
812            self.last_id.fetch_add(1, Ordering::SeqCst),
813            "blockchain.headers.subscribe",
814            vec![],
815        );
816        let value = self.call(req)?;
817
818        Ok(serde_json::from_value(value)?)
819    }
820
821    fn block_headers_pop_raw(&self) -> Result<Option<RawHeaderNotification>, Error> {
822        Ok(self.headers.lock()?.pop_front())
823    }
824
825    fn block_header_raw(&self, height: usize) -> Result<Vec<u8>, Error> {
826        let req = Request::new_id(
827            self.last_id.fetch_add(1, Ordering::SeqCst),
828            "blockchain.block.header",
829            vec![Param::Usize(height)],
830        );
831        let result = self.call(req)?;
832
833        Ok(Vec::<u8>::from_hex(
834            result
835                .as_str()
836                .ok_or_else(|| Error::InvalidResponse(result.clone()))?,
837        )?)
838    }
839
840    fn block_headers(&self, start_height: usize, count: usize) -> Result<GetHeadersRes, Error> {
841        let req = Request::new_id(
842            self.last_id.fetch_add(1, Ordering::SeqCst),
843            "blockchain.block.headers",
844            vec![Param::Usize(start_height), Param::Usize(count)],
845        );
846        let result = self.call(req)?;
847
848        let mut deserialized: GetHeadersRes = serde_json::from_value(result)?;
849        for i in 0..deserialized.count {
850            let (start, end) = (i * 80, (i + 1) * 80);
851            deserialized
852                .headers
853                .push(BlockHeader::consensus_deserialize(
854                    &deserialized.raw_headers[start..end],
855                )?);
856        }
857        deserialized.raw_headers.clear();
858
859        Ok(deserialized)
860    }
861
862    fn estimate_fee(&self, number: usize) -> Result<f64, Error> {
863        let req = Request::new_id(
864            self.last_id.fetch_add(1, Ordering::SeqCst),
865            "blockchain.estimatefee",
866            vec![Param::Usize(number)],
867        );
868        let result = self.call(req)?;
869
870        result
871            .as_f64()
872            .ok_or_else(|| Error::InvalidResponse(result.clone()))
873    }
874
875    fn relay_fee(&self) -> Result<f64, Error> {
876        let req = Request::new_id(
877            self.last_id.fetch_add(1, Ordering::SeqCst),
878            "blockchain.relayfee",
879            vec![],
880        );
881        let result = self.call(req)?;
882
883        result
884            .as_f64()
885            .ok_or_else(|| Error::InvalidResponse(result.clone()))
886    }
887
888    fn script_subscribe(&self, script: &ScriptPubkey) -> Result<Option<ScriptStatus>, Error> {
889        let script_hash = script.to_electrum_scripthash();
890        let mut script_notifications = self.script_notifications.lock()?;
891
892        if script_notifications.contains_key(&script_hash) {
893            return Err(Error::AlreadySubscribed(script_hash));
894        }
895
896        script_notifications.insert(script_hash, VecDeque::new());
897        drop(script_notifications);
898
899        let req = Request::new_id(
900            self.last_id.fetch_add(1, Ordering::SeqCst),
901            "blockchain.scripthash.subscribe",
902            vec![Param::String(script_hash.as_hex())],
903        );
904        let value = self.call(req)?;
905
906        Ok(serde_json::from_value(value)?)
907    }
908
909    fn batch_script_subscribe<'s, I>(&self, scripts: I) -> Result<Vec<Option<ScriptStatus>>, Error>
910    where
911        I: IntoIterator + Clone,
912        I::Item: Borrow<&'s ScriptPubkey>,
913    {
914        {
915            let mut script_notifications = self.script_notifications.lock()?;
916
917            for script in scripts.clone() {
918                let script_hash = script.borrow().to_electrum_scripthash();
919                if script_notifications.contains_key(&script_hash) {
920                    return Err(Error::AlreadySubscribed(script_hash));
921                }
922                script_notifications.insert(script_hash, VecDeque::new());
923            }
924        }
925        impl_batch_call!(self, scripts, script_subscribe)
926    }
927
928    fn script_unsubscribe(&self, script: &ScriptPubkey) -> Result<bool, Error> {
929        let script_hash = script.to_electrum_scripthash();
930        let mut script_notifications = self.script_notifications.lock()?;
931
932        if !script_notifications.contains_key(&script_hash) {
933            return Err(Error::NotSubscribed(script_hash));
934        }
935
936        let req = Request::new_id(
937            self.last_id.fetch_add(1, Ordering::SeqCst),
938            "blockchain.scripthash.unsubscribe",
939            vec![Param::String(script_hash.as_hex())],
940        );
941        let value = self.call(req)?;
942        let answer = serde_json::from_value(value)?;
943
944        script_notifications.remove(&script_hash);
945
946        Ok(answer)
947    }
948
949    fn script_pop(&self, script: &ScriptPubkey) -> Result<Option<ScriptStatus>, Error> {
950        let script_hash = script.to_electrum_scripthash();
951
952        match self.script_notifications.lock()?.get_mut(&script_hash) {
953            None => Err(Error::NotSubscribed(script_hash)),
954            Some(queue) => Ok(queue.pop_front()),
955        }
956    }
957
958    fn script_get_balance(&self, script: &ScriptPubkey) -> Result<GetBalanceRes, Error> {
959        let params = vec![Param::String(script.to_electrum_scripthash().as_hex())];
960        let req = Request::new_id(
961            self.last_id.fetch_add(1, Ordering::SeqCst),
962            "blockchain.scripthash.get_balance",
963            params,
964        );
965        let result = self.call(req)?;
966
967        Ok(serde_json::from_value(result)?)
968    }
969    fn batch_script_get_balance<'s, I>(&self, scripts: I) -> Result<Vec<GetBalanceRes>, Error>
970    where
971        I: IntoIterator + Clone,
972        I::Item: Borrow<&'s ScriptPubkey>,
973    {
974        impl_batch_call!(self, scripts, script_get_balance)
975    }
976
977    fn script_get_history(&self, script: &ScriptPubkey) -> Result<Vec<GetHistoryRes>, Error> {
978        let params = vec![Param::String(script.to_electrum_scripthash().as_hex())];
979        let req = Request::new_id(
980            self.last_id.fetch_add(1, Ordering::SeqCst),
981            "blockchain.scripthash.get_history",
982            params,
983        );
984        let result = self.call(req)?;
985
986        Ok(serde_json::from_value(result)?)
987    }
988    fn batch_script_get_history<'s, I>(&self, scripts: I) -> Result<Vec<Vec<GetHistoryRes>>, Error>
989    where
990        I: IntoIterator + Clone,
991        I::Item: Borrow<&'s ScriptPubkey>,
992    {
993        impl_batch_call!(self, scripts, script_get_history)
994    }
995
996    fn script_list_unspent(&self, script: &ScriptPubkey) -> Result<Vec<ListUnspentRes>, Error> {
997        let params = vec![Param::String(script.to_electrum_scripthash().as_hex())];
998        let req = Request::new_id(
999            self.last_id.fetch_add(1, Ordering::SeqCst),
1000            "blockchain.scripthash.listunspent",
1001            params,
1002        );
1003        let result = self.call(req)?;
1004        let mut result: Vec<ListUnspentRes> = serde_json::from_value(result)?;
1005
1006        // This should not be necessary, since the protocol documentation says that the txs should
1007        // be "in blockchain order" (https://electrumx.readthedocs.io/en/latest/protocol-methods.html#blockchain-scripthash-listunspent).
1008        // However, elects seems to be ignoring this at the moment, so we'll sort again here just
1009        // to make sure the result is consistent.
1010        result.sort_unstable_by_key(|k| (k.height, k.tx_pos));
1011        Ok(result)
1012    }
1013
1014    fn batch_script_list_unspent<'s, I>(
1015        &self,
1016        scripts: I,
1017    ) -> Result<Vec<Vec<ListUnspentRes>>, Error>
1018    where
1019        I: IntoIterator + Clone,
1020        I::Item: Borrow<&'s ScriptPubkey>,
1021    {
1022        impl_batch_call!(self, scripts, script_list_unspent)
1023    }
1024
1025    fn script_get_mempool(&self, script: &ScriptPubkey) -> Result<Vec<GetMempoolRes>, Error> {
1026        let params = vec![Param::String(script.to_electrum_scripthash().as_hex())];
1027        let req = Request::new_id(
1028            self.last_id.fetch_add(1, Ordering::SeqCst),
1029            "blockchain.scripthash.listunspent",
1030            params,
1031        );
1032        let result = self.call(req)?;
1033        Ok(serde_json::from_value(result)?)
1034    }
1035
1036    fn transaction_get_raw(&self, txid: &Txid) -> Result<Option<Vec<u8>>, Error> {
1037        let params = vec![Param::String(format!("{:x}", txid))];
1038        let req = Request::new_id(
1039            self.last_id.fetch_add(1, Ordering::SeqCst),
1040            "blockchain.transaction.get",
1041            params,
1042        );
1043        let result = match self.call(req) {
1044            Ok(result) => result,
1045            Err(Error::Protocol(_)) => return Ok(None),
1046            Err(e) => Err(e)?,
1047        };
1048
1049        Ok(Some(Vec::<u8>::from_hex(
1050            result
1051                .as_str()
1052                .ok_or_else(|| Error::InvalidResponse(result.clone()))?,
1053        )?))
1054    }
1055
1056    fn transaction_get_verbose(&self, txid: &Txid) -> Result<Option<TxRes>, Error> {
1057        let params = vec![Param::String(format!("{:x}", txid)), Param::Bool(true)];
1058        let req = Request::new_id(
1059            self.last_id.fetch_add(1, Ordering::SeqCst),
1060            "blockchain.transaction.get",
1061            params,
1062        );
1063        let result = match self.call(req) {
1064            Ok(result) => result,
1065            Err(Error::Protocol(_)) => return Ok(None),
1066            Err(e) => Err(e)?,
1067        };
1068
1069        let confirmations = result
1070            .get("confirmations")
1071            .map(|v| {
1072                v.as_u64()
1073                    .map(u32::try_from)
1074                    .transpose()
1075                    .ok()
1076                    .flatten()
1077                    .ok_or_else(|| Error::InvalidResponse(v.clone()))
1078            })
1079            .transpose()?
1080            .unwrap_or_default();
1081        let block_hash = result
1082            .get("blockhash")
1083            .map(|hex| {
1084                let s = hex
1085                    .as_str()
1086                    .ok_or_else(|| Error::InvalidResponse(hex.clone()))?;
1087                let data = Vec::<u8>::from_hex(s)?;
1088                Result::<_, Error>::Ok(BlockHash::consensus_deserialize(data)?)
1089            })
1090            .transpose()?;
1091        let time = result
1092            .get("blocktime")
1093            .map(|v| v.as_u64().ok_or_else(|| Error::InvalidResponse(v.clone())))
1094            .transpose()?;
1095        let tx = Vec::<u8>::from_hex(
1096            result
1097                .get("hex")
1098                .and_then(serde_json::Value::as_str)
1099                .ok_or_else(|| Error::InvalidResponse(result.clone()))?,
1100        )?;
1101        let tx = Tx::consensus_deserialize(tx)?;
1102
1103        Ok(Some(TxRes {
1104            confirmations,
1105            block_hash,
1106            time,
1107            tx,
1108        }))
1109    }
1110
1111    fn batch_transaction_get_raw<'t, I>(&self, txids: I) -> Result<Vec<Vec<u8>>, Error>
1112    where
1113        I: IntoIterator + Clone,
1114        I::Item: Borrow<&'t Txid>,
1115    {
1116        let txs_string: Result<Vec<String>, Error> = impl_batch_call!(self, txids, transaction_get);
1117        txs_string?
1118            .iter()
1119            .map(|s| Ok(Vec::<u8>::from_hex(s)?))
1120            .collect()
1121    }
1122
1123    fn batch_block_header_raw<'s, I>(&self, heights: I) -> Result<Vec<Vec<u8>>, Error>
1124    where
1125        I: IntoIterator + Clone,
1126        I::Item: Borrow<u32>,
1127    {
1128        let headers_string: Result<Vec<String>, Error> =
1129            impl_batch_call!(self, heights, block_header, apply_deref);
1130        headers_string?
1131            .iter()
1132            .map(|s| Ok(Vec::<u8>::from_hex(s)?))
1133            .collect()
1134    }
1135
1136    fn batch_estimate_fee<'s, I>(&self, numbers: I) -> Result<Vec<f64>, Error>
1137    where
1138        I: IntoIterator + Clone,
1139        I::Item: Borrow<usize>,
1140    {
1141        impl_batch_call!(self, numbers, estimate_fee, apply_deref)
1142    }
1143
1144    fn transaction_broadcast_raw(&self, raw_tx: &[u8]) -> Result<Txid, Error> {
1145        let params = vec![Param::String(raw_tx.to_hex())];
1146        let req = Request::new_id(
1147            self.last_id.fetch_add(1, Ordering::SeqCst),
1148            "blockchain.transaction.broadcast",
1149            params,
1150        );
1151        let result = self.call(req)?;
1152
1153        Ok(serde_json::from_value(result)?)
1154    }
1155
1156    fn transaction_get_merkle(&self, txid: &Txid, height: usize) -> Result<GetMerkleRes, Error> {
1157        let params = vec![Param::String(format!("{:x}", txid)), Param::Usize(height)];
1158        let req = Request::new_id(
1159            self.last_id.fetch_add(1, Ordering::SeqCst),
1160            "blockchain.transaction.get_merkle",
1161            params,
1162        );
1163        let result = self.call(req)?;
1164
1165        Ok(serde_json::from_value(result)?)
1166    }
1167
1168    fn txid_from_pos(&self, height: usize, tx_pos: usize) -> Result<Txid, Error> {
1169        let params = vec![Param::Usize(height), Param::Usize(tx_pos)];
1170        let req = Request::new_id(
1171            self.last_id.fetch_add(1, Ordering::SeqCst),
1172            "blockchain.transaction.id_from_pos",
1173            params,
1174        );
1175        let result = self.call(req)?;
1176
1177        Ok(serde_json::from_value(result)?)
1178    }
1179
1180    fn txid_from_pos_with_merkle(
1181        &self,
1182        height: usize,
1183        tx_pos: usize,
1184    ) -> Result<TxidFromPosRes, Error> {
1185        let params = vec![
1186            Param::Usize(height),
1187            Param::Usize(tx_pos),
1188            Param::Bool(true),
1189        ];
1190        let req = Request::new_id(
1191            self.last_id.fetch_add(1, Ordering::SeqCst),
1192            "blockchain.transaction.id_from_pos",
1193            params,
1194        );
1195        let result = self.call(req)?;
1196
1197        Ok(serde_json::from_value(result)?)
1198    }
1199
1200    fn server_features(&self) -> Result<ServerFeaturesRes, Error> {
1201        let req = Request::new_id(
1202            self.last_id.fetch_add(1, Ordering::SeqCst),
1203            "server.features",
1204            vec![],
1205        );
1206        let result = self.call(req)?;
1207
1208        Ok(serde_json::from_value(result)?)
1209    }
1210
1211    fn ping(&self) -> Result<(), Error> {
1212        let req = Request::new_id(
1213            self.last_id.fetch_add(1, Ordering::SeqCst),
1214            "server.ping",
1215            vec![],
1216        );
1217        self.call(req)?;
1218
1219        Ok(())
1220    }
1221
1222    #[cfg(feature = "debug-calls")]
1223    fn calls_made(&self) -> Result<usize, Error> {
1224        Ok(self.calls.load(Ordering::SeqCst))
1225    }
1226}
1227
1228#[cfg(test)]
1229mod test {
1230    use bp::TxVer;
1231    use invoice::Address;
1232    use std::str::FromStr;
1233
1234    use super::*;
1235    use crate::api::ElectrumApi;
1236    use crate::utils;
1237
1238    fn get_test_server() -> String {
1239        std::env::var("TEST_ELECTRUM_SERVER").unwrap_or("electrum.blockstream.info:50001".into())
1240    }
1241
1242    #[test]
1243    fn test_server_features_simple() {
1244        let client = RawClient::new(get_test_server(), None).unwrap();
1245
1246        let resp = client.server_features().unwrap();
1247        assert_eq!(
1248            resp.genesis_hash,
1249            [
1250                0, 0, 0, 0, 0, 25, 214, 104, 156, 8, 90, 225, 101, 131, 30, 147, 79, 247, 99, 174,
1251                70, 162, 166, 193, 114, 179, 241, 182, 10, 140, 226, 111
1252            ],
1253        );
1254        assert_eq!(resp.hash_function, Some("sha256".into()));
1255        assert_eq!(resp.pruning, None);
1256    }
1257
1258    #[test]
1259    #[ignore = "depends on a live server"]
1260    fn test_batch_response_ordering() {
1261        // The electrum.blockstream.info:50001 node always sends back ordered responses which will make this always pass.
1262        // However, many servers do not, so we use one of those servers for this test.
1263        let client = RawClient::new("exs.dyshek.org:50001", None).unwrap();
1264        let heights: Vec<u32> = vec![1, 4, 8, 12, 222, 6666, 12];
1265        let result_times = [
1266            1231469665, 1231470988, 1231472743, 1231474888, 1231770653, 1236456633, 1231474888,
1267        ];
1268        // Check ordering 10 times. This usually fails within 5 if ordering is incorrect.
1269        for _ in 0..10 {
1270            let results = client.batch_block_header(&heights).unwrap();
1271            for (index, result) in results.iter().enumerate() {
1272                assert_eq!(result_times[index], result.time);
1273            }
1274        }
1275    }
1276
1277    #[test]
1278    fn test_relay_fee() {
1279        let client = RawClient::new(get_test_server(), None).unwrap();
1280
1281        let resp = client.relay_fee().unwrap();
1282        assert_eq!(resp, 0.00001);
1283    }
1284
1285    #[test]
1286    fn test_estimate_fee() {
1287        let client = RawClient::new(get_test_server(), None).unwrap();
1288
1289        let resp = client.estimate_fee(10).unwrap();
1290        assert!(resp > 0.0);
1291    }
1292
1293    #[test]
1294    fn test_block_header() {
1295        let client = RawClient::new(get_test_server(), None).unwrap();
1296
1297        let resp = client.block_header(0).unwrap();
1298        assert_eq!(resp.version, 1);
1299        assert_eq!(resp.time, 1231006505);
1300        assert_eq!(resp.nonce, 0x7c2bac1d);
1301    }
1302
1303    #[test]
1304    fn test_block_header_raw() {
1305        let client = RawClient::new(get_test_server(), None).unwrap();
1306
1307        let resp = client.block_header_raw(0).unwrap();
1308        assert_eq!(
1309            resp,
1310            vec![
1311                1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1312                0, 0, 0, 0, 0, 0, 0, 0, 59, 163, 237, 253, 122, 123, 18, 178, 122, 199, 44, 62,
1313                103, 118, 143, 97, 127, 200, 27, 195, 136, 138, 81, 50, 58, 159, 184, 170, 75, 30,
1314                94, 74, 41, 171, 95, 73, 255, 255, 0, 29, 29, 172, 43, 124
1315            ]
1316        );
1317    }
1318
1319    #[test]
1320    fn test_block_headers() {
1321        let client = RawClient::new(get_test_server(), None).unwrap();
1322
1323        let resp = client.block_headers(0, 4).unwrap();
1324        assert_eq!(resp.count, 4);
1325        assert_eq!(resp.max, 2016);
1326        assert_eq!(resp.headers.len(), 4);
1327
1328        assert_eq!(resp.headers[0].time, 1231006505);
1329    }
1330
1331    #[test]
1332    fn test_script_get_balance() {
1333        use std::str::FromStr;
1334
1335        let client = RawClient::new(get_test_server(), None).unwrap();
1336
1337        // Realistically nobody will ever spend from this address, so we can expect the balance to
1338        // increase over time
1339        let addr = Address::from_str("1CounterpartyXXXXXXXXXXXXXXXUWLpVr").unwrap();
1340        let resp = client.script_get_balance(&addr.script_pubkey()).unwrap();
1341        assert!(resp.confirmed >= 213091301265);
1342    }
1343
1344    #[test]
1345    fn test_script_get_history() {
1346        use std::str::FromStr;
1347
1348        let client = RawClient::new(get_test_server(), None).unwrap();
1349
1350        // Mt.Gox hack address
1351        let addr = Address::from_str("1FeexV6bAHb8ybZjqQMjJrcCrHGW9sb6uF").unwrap();
1352        let resp = client.script_get_history(&addr.script_pubkey()).unwrap();
1353
1354        assert!(resp.len() >= 328);
1355        assert_eq!(
1356            resp[0].tx_hash,
1357            Txid::from_str("e67a0550848b7932d7796aeea16ab0e48a5cfe81c4e8cca2c5b03e0416850114")
1358                .unwrap()
1359        );
1360    }
1361
1362    #[test]
1363    fn test_script_list_unspent() {
1364        use std::str::FromStr;
1365
1366        let client = RawClient::new(get_test_server(), None).unwrap();
1367
1368        // Peter todd's sha256 bounty address https://bitcointalk.org/index.php?topic=293382.0
1369        let addr = Address::from_str("35Snmmy3uhaer2gTboc81ayCip4m9DT4ko").unwrap();
1370        let resp = client.script_list_unspent(&addr.script_pubkey()).unwrap();
1371
1372        assert!(resp.len() >= 9);
1373        let txid = "397f12ee15f8a3d2ab25c0f6bb7d3c64d2038ca056af10dd8251b98ae0f076b0";
1374        let txid = Txid::from_str(txid).unwrap();
1375        let txs: Vec<_> = resp.iter().filter(|e| e.tx_hash == txid).collect();
1376        assert_eq!(txs.len(), 1);
1377        assert_eq!(txs[0].value, 10000000);
1378        assert_eq!(txs[0].height, 257674);
1379        assert_eq!(txs[0].tx_pos, 1);
1380    }
1381
1382    #[test]
1383    fn test_batch_script_list_unspent() {
1384        use std::str::FromStr;
1385
1386        let client = RawClient::new(get_test_server(), None).unwrap();
1387
1388        // Peter todd's sha256 bounty address https://bitcointalk.org/index.php?topic=293382.0
1389        let script_1 = Address::from_str("35Snmmy3uhaer2gTboc81ayCip4m9DT4ko")
1390            .unwrap()
1391            .script_pubkey();
1392
1393        let resp = client.batch_script_list_unspent(vec![&script_1]).unwrap();
1394        assert_eq!(resp.len(), 1);
1395        assert!(resp[0].len() >= 9);
1396    }
1397
1398    #[test]
1399    fn test_batch_estimate_fee() {
1400        let client = RawClient::new(get_test_server(), None).unwrap();
1401
1402        let resp = client.batch_estimate_fee(vec![10, 20]).unwrap();
1403        assert_eq!(resp.len(), 2);
1404        assert!(resp[0] > 0.0);
1405        assert!(resp[1] > 0.0);
1406    }
1407
1408    #[test]
1409    fn test_transaction_get() {
1410        let client = RawClient::new(get_test_server(), None).unwrap();
1411
1412        let resp = client
1413            .transaction_get(
1414                &Txid::from_str("cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566")
1415                    .unwrap(),
1416            )
1417            .unwrap()
1418            .unwrap();
1419        assert_eq!(resp.version, TxVer::V1);
1420        assert_eq!(resp.lock_time.to_consensus_u32(), 0);
1421    }
1422
1423    #[test]
1424    fn test_transaction_get_raw() {
1425        let client = RawClient::new(get_test_server(), None).unwrap();
1426
1427        let resp = client
1428            .transaction_get_raw(
1429                &Txid::from_str("cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566")
1430                    .unwrap(),
1431            )
1432            .unwrap();
1433        assert_eq!(
1434            resp,
1435            Some(vec![
1436                1, 0, 0, 0, 0, 1, 1, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1437                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 255, 255, 255, 255, 84, 3, 240, 156, 9, 27, 77,
1438                105, 110, 101, 100, 32, 98, 121, 32, 65, 110, 116, 80, 111, 111, 108, 49, 49, 57,
1439                174, 0, 111, 32, 7, 77, 101, 40, 250, 190, 109, 109, 42, 177, 148, 141, 80, 179,
1440                217, 145, 226, 160, 130, 29, 247, 67, 88, 237, 156, 37, 83, 175, 0, 199, 166, 31,
1441                151, 119, 28, 160, 172, 238, 16, 110, 4, 0, 0, 0, 0, 0, 0, 0, 203, 236, 0, 128, 36,
1442                97, 249, 5, 255, 255, 255, 255, 3, 84, 206, 172, 42, 0, 0, 0, 0, 25, 118, 169, 20,
1443                17, 219, 228, 140, 198, 182, 23, 249, 198, 173, 175, 77, 158, 213, 246, 37, 177,
1444                199, 203, 89, 136, 172, 0, 0, 0, 0, 0, 0, 0, 0, 38, 106, 36, 170, 33, 169, 237, 46,
1445                87, 139, 206, 44, 166, 198, 188, 147, 89, 55, 115, 69, 216, 233, 133, 221, 95, 144,
1446                199, 132, 33, 255, 166, 239, 165, 235, 96, 66, 142, 105, 140, 0, 0, 0, 0, 0, 0, 0,
1447                0, 38, 106, 36, 185, 225, 27, 109, 47, 98, 29, 126, 195, 244, 90, 94, 202, 137,
1448                211, 234, 106, 41, 76, 223, 58, 4, 46, 151, 48, 9, 88, 68, 112, 161, 41, 22, 17,
1449                30, 44, 170, 1, 32, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0,
1450                0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0
1451            ])
1452        )
1453    }
1454
1455    #[test]
1456    fn test_transaction_get_merkle() {
1457        let client = RawClient::new(get_test_server(), None).unwrap();
1458
1459        let txid =
1460            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1461                .unwrap();
1462        let resp = client.transaction_get_merkle(&txid, 630000).unwrap();
1463        assert_eq!(resp.block_height, 630000);
1464        assert_eq!(resp.pos, 68);
1465        assert_eq!(resp.merkle.len(), 12);
1466        assert_eq!(
1467            resp.merkle[0],
1468            [
1469                34, 65, 51, 64, 49, 139, 115, 189, 185, 246, 70, 225, 168, 193, 217, 195, 47, 66,
1470                179, 240, 153, 24, 114, 215, 144, 196, 212, 41, 39, 155, 246, 25
1471            ]
1472        );
1473
1474        // Check we can verify the merkle proof validity, but fail if we supply wrong data.
1475        let block_header = client.block_header(resp.block_height).unwrap();
1476        assert!(utils::validate_merkle_proof(
1477            &txid,
1478            &block_header.merkle_root,
1479            &resp
1480        ));
1481
1482        let mut fail_resp = resp.clone();
1483        fail_resp.pos = 13;
1484        assert!(!utils::validate_merkle_proof(
1485            &txid,
1486            &block_header.merkle_root,
1487            &fail_resp
1488        ));
1489
1490        let fail_block_header = client.block_header(resp.block_height + 1).unwrap();
1491        assert!(!utils::validate_merkle_proof(
1492            &txid,
1493            &fail_block_header.merkle_root,
1494            &resp
1495        ));
1496    }
1497
1498    #[test]
1499    fn test_txid_from_pos() {
1500        let client = RawClient::new(get_test_server(), None).unwrap();
1501
1502        let txid =
1503            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1504                .unwrap();
1505        let resp = client.txid_from_pos(630000, 68).unwrap();
1506        assert_eq!(resp, txid);
1507    }
1508
1509    #[test]
1510    fn test_txid_from_pos_with_merkle() {
1511        let client = RawClient::new(get_test_server(), None).unwrap();
1512
1513        let txid =
1514            Txid::from_str("1f7ff3c407f33eabc8bec7d2cc230948f2249ec8e591bcf6f971ca9366c8788d")
1515                .unwrap();
1516        let resp = client.txid_from_pos_with_merkle(630000, 68).unwrap();
1517        assert_eq!(resp.tx_hash, txid);
1518        assert_eq!(
1519            resp.merkle[0],
1520            [
1521                34, 65, 51, 64, 49, 139, 115, 189, 185, 246, 70, 225, 168, 193, 217, 195, 47, 66,
1522                179, 240, 153, 24, 114, 215, 144, 196, 212, 41, 39, 155, 246, 25
1523            ]
1524        );
1525    }
1526
1527    #[test]
1528    fn test_ping() {
1529        let client = RawClient::new(get_test_server(), None).unwrap();
1530        client.ping().unwrap();
1531    }
1532
1533    #[test]
1534    fn test_block_headers_subscribe() {
1535        let client = RawClient::new(get_test_server(), None).unwrap();
1536        let resp = client.block_headers_subscribe().unwrap();
1537
1538        assert!(resp.height >= 639000);
1539    }
1540
1541    #[test]
1542    fn test_script_subscribe() {
1543        use std::str::FromStr;
1544
1545        let client = RawClient::new(get_test_server(), None).unwrap();
1546
1547        // Mt.Gox hack address
1548        let addr = Address::from_str("1FeexV6bAHb8ybZjqQMjJrcCrHGW9sb6uF").unwrap();
1549
1550        // Just make sure that the call returns Ok(something)
1551        client.script_subscribe(&addr.script_pubkey()).unwrap();
1552    }
1553
1554    #[test]
1555    fn test_request_after_error() {
1556        let client = RawClient::new(get_test_server(), None).unwrap();
1557
1558        assert!(client.transaction_broadcast_raw(&[0x00]).is_err());
1559        assert!(client.server_features().is_ok());
1560    }
1561
1562    #[test]
1563    fn test_raw_call() {
1564        use crate::types::Param;
1565
1566        let client = RawClient::new(get_test_server(), None).unwrap();
1567
1568        let params = vec![
1569            Param::String(
1570                "cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566".to_string(),
1571            ),
1572            Param::Bool(false),
1573        ];
1574
1575        let resp = client
1576            .raw_call("blockchain.transaction.get", params)
1577            .unwrap();
1578
1579        assert_eq!(
1580            resp,
1581            "01000000000101000000000000000000000000000000000000000000000000000\
1582            0000000000000ffffffff5403f09c091b4d696e656420627920416e74506f6f6c3\
1583            13139ae006f20074d6528fabe6d6d2ab1948d50b3d991e2a0821df74358ed9c255\
1584            3af00c7a61f97771ca0acee106e0400000000000000cbec00802461f905fffffff\
1585            f0354ceac2a000000001976a91411dbe48cc6b617f9c6adaf4d9ed5f625b1c7cb5\
1586            988ac0000000000000000266a24aa21a9ed2e578bce2ca6c6bc9359377345d8e98\
1587            5dd5f90c78421ffa6efa5eb60428e698c0000000000000000266a24b9e11b6d2f6\
1588            21d7ec3f45a5eca89d3ea6a294cdf3a042e973009584470a12916111e2caa01200\
1589            000000000000000000000000000000000000000000000000000000000000000000\
1590            00000"
1591        )
1592    }
1593}