use std::collections::{BTreeMap, HashMap, VecDeque};
use std::io::{BufRead, BufReader, Read, Write};
use std::mem::drop;
use std::net::{TcpStream, ToSocketAddrs};
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::mpsc::{Receiver, Sender, channel};
use std::sync::{Arc, Mutex, TryLockError};
use std::time::Duration;
#[allow(unused_imports)]
use log::{debug, error, info, trace, warn};
#[cfg(feature = "use-openssl")]
use openssl::ssl::{SslConnector, SslMethod, SslStream, SslVerifyMode};
#[cfg(all(
any(
feature = "default",
feature = "use-rustls",
feature = "use-rustls-ring"
),
not(feature = "use-openssl")
))]
use rustls::{
ClientConfig, ClientConnection, RootCertStore, StreamOwned,
pki_types::ServerName,
pki_types::{Der, TrustAnchor},
};
#[cfg(any(feature = "default", feature = "proxy"))]
use crate::socks::{Socks5Stream, TargetAddr, ToTargetAddr};
#[cfg(feature = "use-websocket")]
use crate::websocket::WebSocketWrapper;
#[cfg(feature = "use-websocket")]
use tungstenite;
use crate::stream::ClonableStream;
use crate::api::ElectrumApi;
use crate::batch::Batch;
use crate::types::*;
pub trait ToSocketAddrsDomain: ToSocketAddrs {
fn domain(&self) -> Option<&str> {
None
}
}
impl ToSocketAddrsDomain for &str {
fn domain(&self) -> Option<&str> {
self.split(':').next()
}
}
impl ToSocketAddrsDomain for (&str, u16) {
fn domain(&self) -> Option<&str> {
self.0.domain()
}
}
#[cfg(any(feature = "default", feature = "proxy"))]
impl ToSocketAddrsDomain for TargetAddr {
fn domain(&self) -> Option<&str> {
match self {
TargetAddr::Ip(_) => None,
TargetAddr::Domain(domain, _) => Some(domain.as_str()),
}
}
}
macro_rules! impl_to_socket_addrs_domain {
( $ty:ty ) => {
impl ToSocketAddrsDomain for $ty {}
};
}
impl_to_socket_addrs_domain!(std::net::SocketAddr);
impl_to_socket_addrs_domain!(std::net::SocketAddrV4);
impl_to_socket_addrs_domain!(std::net::SocketAddrV6);
impl_to_socket_addrs_domain!((std::net::IpAddr, u16));
impl_to_socket_addrs_domain!((std::net::Ipv4Addr, u16));
impl_to_socket_addrs_domain!((std::net::Ipv6Addr, u16));
#[derive(Debug)]
pub struct GenericNotification {
#[allow(dead_code)]
method: String,
#[allow(dead_code)]
result: serde_json::Value,
}
#[derive(Debug)]
pub struct RawClient<S>
where
S: Read + Write,
{
stream: Mutex<ClonableStream<S>>,
buf_reader: Mutex<BufReader<ClonableStream<S>>>,
last_id: AtomicUsize,
waiting_map: Mutex<HashMap<usize, Sender<ChannelMessage>>>,
subscription_notification: Mutex<VecDeque<GenericNotification>>,
#[cfg(feature = "debug-calls")]
calls: AtomicUsize,
}
impl<S> From<S> for RawClient<S>
where
S: Read + Write,
{
fn from(stream: S) -> Self {
let stream: ClonableStream<_> = stream.into();
Self {
buf_reader: Mutex::new(BufReader::new(stream.clone())),
stream: Mutex::new(stream),
last_id: AtomicUsize::new(0),
waiting_map: Mutex::new(HashMap::new()),
subscription_notification: Mutex::new(VecDeque::new()),
#[cfg(feature = "debug-calls")]
calls: AtomicUsize::new(0),
}
}
}
pub type ElectrumPlaintextStream = TcpStream;
impl RawClient<ElectrumPlaintextStream> {
pub fn new<A: ToSocketAddrs>(
socket_addrs: A,
timeout: Option<Duration>,
) -> Result<Self, Error> {
let stream = match timeout {
Some(timeout) => {
let stream = connect_with_total_timeout(socket_addrs, timeout)?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
stream
}
None => TcpStream::connect(socket_addrs)?,
};
Ok(stream.into())
}
}
fn connect_with_total_timeout<A: ToSocketAddrs>(
socket_addrs: A,
mut timeout: Duration,
) -> Result<TcpStream, Error> {
let mut errors = Vec::new();
let addrs = socket_addrs
.to_socket_addrs()?
.enumerate()
.collect::<Vec<_>>();
for (index, addr) in &addrs {
if *index < addrs.len() - 1 {
timeout = timeout.div_f32(2.0);
}
info!(
"Trying to connect to {} (attempt {}/{}) with timeout {:?}",
addr,
index + 1,
addrs.len(),
timeout
);
match TcpStream::connect_timeout(addr, timeout) {
Ok(socket) => return Ok(socket),
Err(e) => {
warn!("Connection error: {:?}", e);
errors.push(e.into());
}
}
}
Err(Error::AllAttemptsErrored(errors))
}
#[cfg(feature = "use-openssl")]
pub type ElectrumSslStream = SslStream<TcpStream>;
#[cfg(feature = "use-openssl")]
impl RawClient<ElectrumSslStream> {
pub fn new_ssl<A: ToSocketAddrsDomain + Clone>(
socket_addrs: A,
validate_domain: bool,
timeout: Option<Duration>,
) -> Result<Self, Error> {
debug!(
"new_ssl socket_addrs.domain():{:?} validate_domain:{} timeout:{:?}",
socket_addrs.domain(),
validate_domain,
timeout
);
if validate_domain {
socket_addrs.domain().ok_or(Error::MissingDomain)?;
}
match timeout {
Some(timeout) => {
let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
}
None => {
let stream = TcpStream::connect(socket_addrs.clone())?;
Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
}
}
}
pub fn new_ssl_from_stream<A: ToSocketAddrsDomain>(
socket_addrs: A,
validate_domain: bool,
stream: TcpStream,
) -> Result<Self, Error> {
let mut builder =
SslConnector::builder(SslMethod::tls()).map_err(Error::InvalidSslMethod)?;
if validate_domain {
socket_addrs.domain().ok_or(Error::MissingDomain)?;
} else {
builder.set_verify(SslVerifyMode::NONE);
}
let connector = builder.build();
let domain = socket_addrs.domain().unwrap_or("NONE").to_string();
let stream = connector
.connect(&domain, stream)
.map_err(Error::SslHandshakeError)?;
Ok(stream.into())
}
}
#[cfg(all(
any(
feature = "default",
feature = "use-rustls",
feature = "use-rustls-ring"
),
not(feature = "use-openssl")
))]
mod danger {
use super::ServerName;
use rustls::DigitallySignedStruct;
use rustls::client::danger::{HandshakeSignatureValid, ServerCertVerified};
use rustls::crypto::CryptoProvider;
use rustls::pki_types::{CertificateDer, UnixTime};
#[derive(Debug)]
pub struct NoCertificateVerification(CryptoProvider);
impl NoCertificateVerification {
pub fn new(provider: CryptoProvider) -> Self {
Self(provider)
}
}
impl rustls::client::danger::ServerCertVerifier for NoCertificateVerification {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp: &[u8],
_now: UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
self.0.signature_verification_algorithms.supported_schemes()
}
}
}
#[cfg(all(
any(
feature = "default",
feature = "use-rustls",
feature = "use-rustls-ring"
),
not(feature = "use-openssl")
))]
pub type ElectrumSslStream = StreamOwned<ClientConnection, TcpStream>;
#[cfg(all(
any(
feature = "default",
feature = "use-rustls",
feature = "use-rustls-ring"
),
not(feature = "use-openssl")
))]
impl RawClient<ElectrumSslStream> {
pub fn new_ssl<A: ToSocketAddrsDomain + Clone>(
socket_addrs: A,
validate_domain: bool,
timeout: Option<Duration>,
) -> Result<Self, Error> {
debug!(
"new_ssl socket_addrs.domain():{:?} validate_domain:{} timeout:{:?}",
socket_addrs.domain(),
validate_domain,
timeout
);
if validate_domain {
socket_addrs.domain().ok_or(Error::MissingDomain)?;
}
match timeout {
Some(timeout) => {
let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
}
None => {
let stream = TcpStream::connect(socket_addrs.clone())?;
Self::new_ssl_from_stream(socket_addrs, validate_domain, stream)
}
}
}
pub fn new_ssl_from_stream<A: ToSocketAddrsDomain>(
socket_addr: A,
validate_domain: bool,
tcp_stream: TcpStream,
) -> Result<Self, Error> {
use std::convert::TryFrom;
if rustls::crypto::CryptoProvider::get_default().is_none() {
#[cfg(all(feature = "use-rustls", not(feature = "use-rustls-ring")))]
rustls::crypto::CryptoProvider::install_default(
rustls::crypto::aws_lc_rs::default_provider(),
)
.map_err(|_| {
Error::CouldNotCreateConnection(rustls::Error::General(
"Failed to install CryptoProvider".to_string(),
))
})?;
#[cfg(feature = "use-rustls-ring")]
rustls::crypto::CryptoProvider::install_default(
rustls::crypto::ring::default_provider(),
)
.map_err(|_| {
Error::CouldNotCreateConnection(rustls::Error::General(
"Failed to install CryptoProvider".to_string(),
))
})?;
}
let builder = ClientConfig::builder();
let config = if validate_domain {
socket_addr.domain().ok_or(Error::MissingDomain)?;
let store = webpki_roots::TLS_SERVER_ROOTS
.iter()
.map(|t| TrustAnchor {
subject: Der::from_slice(t.subject),
subject_public_key_info: Der::from_slice(t.spki),
name_constraints: t.name_constraints.map(Der::from_slice),
})
.collect::<RootCertStore>();
builder.with_root_certificates(store).with_no_client_auth()
} else {
builder
.dangerous()
.with_custom_certificate_verifier(std::sync::Arc::new(
#[cfg(all(feature = "use-rustls", not(feature = "use-rustls-ring")))]
danger::NoCertificateVerification::new(rustls::crypto::aws_lc_rs::default_provider()),
#[cfg(feature = "use-rustls-ring")]
danger::NoCertificateVerification::new(rustls::crypto::ring::default_provider()),
))
.with_no_client_auth()
};
let domain = socket_addr.domain().unwrap_or("NONE").to_string();
let session = ClientConnection::new(
std::sync::Arc::new(config),
ServerName::try_from(domain.clone())
.map_err(|_| Error::InvalidDNSNameError(domain.clone()))?,
)
.map_err(Error::CouldNotCreateConnection)?;
let stream = StreamOwned::new(session, tcp_stream);
Ok(stream.into())
}
}
#[cfg(any(feature = "default", feature = "proxy"))]
pub type ElectrumProxyStream = Socks5Stream;
#[cfg(any(feature = "default", feature = "proxy"))]
impl RawClient<ElectrumProxyStream> {
pub fn new_proxy<T: ToTargetAddr>(
target_addr: T,
proxy: &crate::Socks5Config,
timeout: Option<Duration>,
) -> Result<Self, Error> {
let mut stream = match proxy.credentials.as_ref() {
Some(cred) => Socks5Stream::connect_with_password(
&proxy.addr,
target_addr,
&cred.username,
&cred.password,
timeout,
)?,
None => Socks5Stream::connect(&proxy.addr, target_addr, timeout)?,
};
stream.get_mut().set_read_timeout(timeout)?;
stream.get_mut().set_write_timeout(timeout)?;
Ok(stream.into())
}
#[cfg(any(
feature = "use-openssl",
feature = "use-rustls",
feature = "use-rustls-ring"
))]
pub fn new_proxy_ssl<T: ToTargetAddr>(
target_addr: T,
validate_domain: bool,
proxy: &crate::Socks5Config,
timeout: Option<Duration>,
) -> Result<RawClient<ElectrumSslStream>, Error> {
let target = target_addr.to_target_addr()?;
let mut stream = match proxy.credentials.as_ref() {
Some(cred) => Socks5Stream::connect_with_password(
&proxy.addr,
target_addr,
&cred.username,
&cred.password,
timeout,
)?,
None => Socks5Stream::connect(&proxy.addr, target.clone(), timeout)?,
};
stream.get_mut().set_read_timeout(timeout)?;
stream.get_mut().set_write_timeout(timeout)?;
RawClient::new_ssl_from_stream(target, validate_domain, stream.into_inner())
}
}
#[cfg(feature = "use-websocket")]
pub type ElectrumWsStream = WebSocketWrapper<TcpStream>;
#[cfg(feature = "use-websocket")]
impl RawClient<ElectrumWsStream> {
pub fn new_ws<A: ToSocketAddrsDomain + Clone>(
socket_addrs: A,
timeout: Option<Duration>,
max_message_size: Option<usize>,
) -> Result<Self, Error> {
let domain = socket_addrs.domain().unwrap_or("localhost").to_string();
let stream = match timeout {
Some(timeout) => {
let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
stream
}
None => TcpStream::connect(socket_addrs)?,
};
let url = format!("ws://{}/", domain);
let ws_config = tungstenite::protocol::WebSocketConfig::default()
.max_frame_size(max_message_size)
.max_message_size(max_message_size);
let (socket, _response) =
tungstenite::client::client_with_config(url, stream, Some(ws_config))
.map_err(|e| Error::Message(format!("WebSocket handshake failed: {}", e)))?;
Ok(WebSocketWrapper::new(socket).into())
}
}
#[cfg(all(
feature = "use-websocket",
any(feature = "use-rustls", feature = "use-rustls-ring"),
not(feature = "use-openssl")
))]
pub type ElectrumWssStream = WebSocketWrapper<StreamOwned<ClientConnection, TcpStream>>;
#[cfg(all(
feature = "use-websocket",
any(feature = "use-rustls", feature = "use-rustls-ring"),
not(feature = "use-openssl")
))]
impl RawClient<ElectrumWssStream> {
pub fn new_wss<A: ToSocketAddrsDomain + Clone>(
socket_addrs: A,
validate_domain: bool,
timeout: Option<Duration>,
max_message_size: Option<usize>,
) -> Result<Self, Error> {
use std::convert::TryFrom;
let domain = socket_addrs
.domain()
.ok_or(Error::MissingDomain)?
.to_string();
let stream = match timeout {
Some(timeout) => {
let stream = connect_with_total_timeout(socket_addrs.clone(), timeout)?;
stream.set_read_timeout(Some(timeout))?;
stream.set_write_timeout(Some(timeout))?;
stream
}
None => TcpStream::connect(socket_addrs)?,
};
if rustls::crypto::CryptoProvider::get_default().is_none() {
#[cfg(all(feature = "use-rustls", not(feature = "use-rustls-ring")))]
rustls::crypto::CryptoProvider::install_default(
rustls::crypto::aws_lc_rs::default_provider(),
)
.map_err(|_| {
Error::CouldNotCreateConnection(rustls::Error::General(
"Failed to install CryptoProvider".to_string(),
))
})?;
#[cfg(feature = "use-rustls-ring")]
rustls::crypto::CryptoProvider::install_default(
rustls::crypto::ring::default_provider(),
)
.map_err(|_| {
Error::CouldNotCreateConnection(rustls::Error::General(
"Failed to install CryptoProvider".to_string(),
))
})?;
}
let builder = ClientConfig::builder();
let config = if validate_domain {
let store = webpki_roots::TLS_SERVER_ROOTS
.iter()
.map(|t| TrustAnchor {
subject: Der::from_slice(t.subject),
subject_public_key_info: Der::from_slice(t.spki),
name_constraints: t.name_constraints.map(Der::from_slice),
})
.collect::<RootCertStore>();
builder.with_root_certificates(store).with_no_client_auth()
} else {
builder
.dangerous()
.with_custom_certificate_verifier(std::sync::Arc::new(
#[cfg(all(feature = "use-rustls", not(feature = "use-rustls-ring")))]
danger::NoCertificateVerification::new(
rustls::crypto::aws_lc_rs::default_provider(),
),
#[cfg(feature = "use-rustls-ring")]
danger::NoCertificateVerification::new(
rustls::crypto::ring::default_provider(),
),
))
.with_no_client_auth()
};
let session = ClientConnection::new(
std::sync::Arc::new(config),
ServerName::try_from(domain.clone())
.map_err(|_| Error::InvalidDNSNameError(domain.clone()))?,
)
.map_err(Error::CouldNotCreateConnection)?;
let tls_stream = StreamOwned::new(session, stream);
let url = format!("wss://{}/", domain);
let ws_config = tungstenite::protocol::WebSocketConfig::default()
.max_frame_size(max_message_size)
.max_message_size(max_message_size);
let (socket, _response) =
tungstenite::client::client_with_config(url, tls_stream, Some(ws_config))
.map_err(|e| Error::Message(format!("WebSocket handshake failed: {}", e)))?;
Ok(WebSocketWrapper::new(socket).into())
}
}
#[derive(Debug)]
enum ChannelMessage {
Response(serde_json::Value),
WakeUp,
Error(Arc<std::io::Error>),
}
impl<S: Read + Write> RawClient<S> {
fn _reader_thread(&self, until_message: Option<usize>) -> Result<serde_json::Value, Error> {
let mut raw_resp = String::new();
let resp = match self.buf_reader.try_lock() {
Ok(mut reader) => {
trace!(
"Starting reader thread with `until_message` = {:?}",
until_message
);
if let Some(until_message) = until_message {
if self.waiting_map.lock()?.get(&until_message).is_none() {
return Err(Error::CouldntLockReader);
}
}
loop {
raw_resp.clear();
if let Err(e) = reader.read_line(&mut raw_resp) {
let error = Arc::new(e);
for (_, s) in self.waiting_map.lock().unwrap().drain() {
s.send(ChannelMessage::Error(error.clone()))?;
}
return Err(Error::SharedIOError(error));
}
trace!("<== {}", raw_resp);
let resp: serde_json::Value = serde_json::from_str(&raw_resp)?;
let resp_id = resp["id"]
.as_str()
.and_then(|s| s.parse().ok())
.or_else(|| resp["id"].as_u64().map(|i| i as usize));
match resp_id {
Some(resp_id) if until_message == Some(resp_id) => {
trace!(
"Reader thread {} received a response for its request",
resp_id
);
let mut map = self.waiting_map.lock()?;
map.remove(&resp_id);
if let Some(err) = map.values().find_map(|sender| {
sender
.send(ChannelMessage::WakeUp)
.inspect_err(|_| {
warn!("Unable to wake up a thread, trying some other");
})
.err()
}) {
error!("All the threads have failed, giving up");
return Err(err)?;
}
break Ok(resp);
}
Some(resp_id) => {
trace!("Reader thread received response for {}", resp_id);
if let Some(sender) = self.waiting_map.lock()?.remove(&resp_id) {
sender.send(ChannelMessage::Response(resp))?;
} else {
warn!("Missing listener for {}", resp_id);
}
}
None => {
let mut resp = resp;
if let Some(method) = resp["method"].take().as_str() {
self.handle_notification(method, resp["params"].take())?;
} else {
warn!("Unexpected response: {:?}", resp);
}
}
}
}
}
Err(TryLockError::WouldBlock) => {
Err(Error::CouldntLockReader)
}
Err(TryLockError::Poisoned(e)) => Err(e)?,
};
let resp = resp?;
if let Some(err) = resp.get("error") {
Err(Error::Protocol(err.clone()))
} else {
Ok(resp)
}
}
fn call(&self, req: Request) -> Result<serde_json::Value, Error> {
let (sender, receiver) = channel();
self.waiting_map.lock()?.insert(req.id, sender);
let mut raw = serde_json::to_vec(&req)?;
trace!("==> {}", String::from_utf8_lossy(&raw));
raw.extend_from_slice(b"\n");
let mut stream = self.stream.lock()?;
stream.write_all(&raw)?;
stream.flush()?;
drop(stream);
self.increment_calls();
let mut resp = match self.recv(&receiver, req.id) {
Ok(resp) => resp,
e @ Err(_) => {
self.waiting_map.lock()?.remove(&req.id);
return e;
}
};
Ok(resp["result"].take())
}
fn recv(
&self,
receiver: &Receiver<ChannelMessage>,
req_id: usize,
) -> Result<serde_json::Value, Error> {
loop {
match self._reader_thread(Some(req_id)) {
Ok(response) => break Ok(response),
Err(Error::CouldntLockReader) => {
match receiver.recv()? {
ChannelMessage::Response(received) => break Ok(received),
ChannelMessage::WakeUp => {
trace!("WakeUp for {}", req_id);
continue;
}
ChannelMessage::Error(e) => {
warn!("Received ChannelMessage::Error");
break Err(Error::SharedIOError(e));
}
}
}
e @ Err(_) => break e,
}
}
}
fn handle_notification(&self, method: &str, result: serde_json::Value) -> Result<(), Error> {
let max_notifications = 1000;
let mut notifications = self.subscription_notification.lock()?;
if notifications.len() >= max_notifications {
notifications.pop_front();
}
notifications.push_back(GenericNotification {
method: method.to_string(),
result,
});
Ok(())
}
pub(crate) fn internal_raw_call_with_vec(
&self,
method_name: &str,
params: Vec<Param>,
) -> Result<serde_json::Value, Error> {
let req = Request::new_id(
self.last_id.fetch_add(1, Ordering::SeqCst),
method_name,
params,
);
let result = self.call(req)?;
Ok(result)
}
#[inline]
#[cfg(feature = "debug-calls")]
fn increment_calls(&self) {
self.calls.fetch_add(1, Ordering::SeqCst);
}
#[inline]
#[cfg(not(feature = "debug-calls"))]
fn increment_calls(&self) {}
}
impl<T: Read + Write> ElectrumApi for RawClient<T> {
fn raw_call(
&self,
method_name: &str,
params: impl IntoIterator<Item = Param>,
) -> Result<serde_json::Value, Error> {
self.internal_raw_call_with_vec(method_name, params.into_iter().collect())
}
fn batch_call(&self, batch: &Batch) -> Result<Vec<serde_json::Value>, Error> {
let mut raw = Vec::new();
let mut missing_responses = Vec::new();
let mut answers = BTreeMap::new();
for (method, params) in batch.iter() {
let req = Request::new_id(
self.last_id.fetch_add(1, Ordering::SeqCst),
method,
params.to_vec(),
);
let (sender, receiver) = channel();
missing_responses.push((req.id, receiver));
self.waiting_map.lock()?.insert(req.id, sender);
raw.append(&mut serde_json::to_vec(&req)?);
raw.extend_from_slice(b"\n");
}
if missing_responses.is_empty() {
return Ok(vec![]);
}
trace!("==> {}", String::from_utf8_lossy(&raw));
let mut stream = self.stream.lock()?;
stream.write_all(&raw)?;
stream.flush()?;
drop(stream);
self.increment_calls();
for (req_id, receiver) in missing_responses.iter() {
match self.recv(receiver, *req_id) {
Ok(mut resp) => answers.insert(req_id, resp["result"].take()),
Err(e) => {
warn!("got error for req_id {}: {:?}", req_id, e);
warn!("removing all waiting req of this batch");
let mut guard = self.waiting_map.lock()?;
for (req_id, _) in missing_responses.iter() {
guard.remove(req_id);
}
return Err(e);
}
};
}
Ok(answers.into_values().collect())
}
fn ping(&self) -> Result<(), Error> {
let req = Request::new_id(
self.last_id.fetch_add(1, Ordering::SeqCst),
"server.ping",
vec![],
);
self.call(req)?;
Ok(())
}
#[cfg(feature = "debug-calls")]
fn calls_made(&self) -> Result<usize, Error> {
Ok(self.calls.load(Ordering::SeqCst))
}
}
#[cfg(test)]
mod test {
use super::RawClient;
use crate::api::ElectrumApi;
fn get_test_server() -> String {
std::env::var("TEST_ELECTRUM_SERVER").unwrap_or("electrum.blockstream.info:50001".into())
}
#[test]
fn test_ping() {
let client = RawClient::new(get_test_server(), None).unwrap();
client.ping().unwrap();
}
#[test]
fn test_raw_call() {
use crate::types::Param;
let client = RawClient::new(get_test_server(), None).unwrap();
let params = vec![
Param::String(
"cc2ca076fd04c2aeed6d02151c447ced3d09be6fb4d4ef36cb5ed4e7a3260566".to_string(),
),
Param::Bool(false),
];
let resp = client
.raw_call("blockchain.transaction.get", params)
.unwrap();
assert_eq!(
resp,
"01000000000101000000000000000000000000000000000000000000000000000\
0000000000000ffffffff5403f09c091b4d696e656420627920416e74506f6f6c3\
13139ae006f20074d6528fabe6d6d2ab1948d50b3d991e2a0821df74358ed9c255\
3af00c7a61f97771ca0acee106e0400000000000000cbec00802461f905fffffff\
f0354ceac2a000000001976a91411dbe48cc6b617f9c6adaf4d9ed5f625b1c7cb5\
988ac0000000000000000266a24aa21a9ed2e578bce2ca6c6bc9359377345d8e98\
5dd5f90c78421ffa6efa5eb60428e698c0000000000000000266a24b9e11b6d2f6\
21d7ec3f45a5eca89d3ea6a294cdf3a042e973009584470a12916111e2caa01200\
000000000000000000000000000000000000000000000000000000000000000000\
00000"
)
}
}