use std::sync::Arc;
use rustls::{
client::{ServerCertVerified, ServerCertVerifier},
Certificate, ClientConfig as RustlsConfig, OwnedTrustAnchor, RootCertStore, ServerName,
};
use serde::{Deserialize, Serialize};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::net::TcpStream;
use tokio::sync::Mutex;
use tokio_rustls::client::TlsStream;
use tokio_rustls::TlsConnector;
use crate::error::{ElectrumError, Result};
use crate::types::ClientConfig;
struct NoVerifier;
impl ServerCertVerifier for NoVerifier {
fn verify_server_cert(
&self,
_end_entity: &Certificate,
_intermediates: &[Certificate],
_server_name: &ServerName,
_scts: &mut dyn Iterator<Item = &[u8]>,
_ocsp_response: &[u8],
_now: std::time::SystemTime,
) -> std::result::Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
}
#[derive(Debug, Serialize)]
pub struct JsonRpcRequest<'a> {
pub jsonrpc: &'static str,
pub id: u64,
pub method: &'a str,
pub params: Vec<serde_json::Value>,
}
impl<'a> JsonRpcRequest<'a> {
pub fn new(id: u64, method: &'a str, params: Vec<serde_json::Value>) -> Self {
Self {
jsonrpc: "2.0",
id,
method,
params,
}
}
}
#[derive(Debug, Deserialize)]
pub struct JsonRpcResponse {
pub jsonrpc: String,
pub id: Option<u64>,
pub result: Option<serde_json::Value>,
pub error: Option<JsonRpcError>,
}
#[derive(Debug, Deserialize)]
pub struct JsonRpcError {
pub code: i32,
pub message: String,
}
enum Connection {
Tcp(BufReader<TcpStream>),
Tls(Box<BufReader<TlsStream<TcpStream>>>),
}
pub struct Transport {
connection: Mutex<Connection>,
config: ClientConfig,
}
impl Transport {
pub async fn connect(config: ClientConfig) -> Result<Self> {
let connection = if config.use_tls {
Self::connect_tls(&config).await?
} else {
Self::connect_tcp(&config).await?
};
Ok(Self {
connection: Mutex::new(connection),
config,
})
}
async fn connect_tcp(config: &ClientConfig) -> Result<Connection> {
let addr = config.address();
let stream = tokio::time::timeout(config.timeout, TcpStream::connect(&addr))
.await
.map_err(|_| ElectrumError::Timeout)?
.map_err(|e| ElectrumError::ConnectionFailed(format!("{}: {}", addr, e)))?;
stream.set_nodelay(true)?;
Ok(Connection::Tcp(BufReader::new(stream)))
}
async fn connect_tls(config: &ClientConfig) -> Result<Connection> {
let addr = config.address();
let stream = tokio::time::timeout(config.timeout, TcpStream::connect(&addr))
.await
.map_err(|_| ElectrumError::Timeout)?
.map_err(|e| ElectrumError::ConnectionFailed(format!("{}: {}", addr, e)))?;
stream.set_nodelay(true)?;
let tls_config = if config.skip_tls_verify {
RustlsConfig::builder()
.with_safe_defaults()
.with_custom_certificate_verifier(Arc::new(NoVerifier))
.with_no_client_auth()
} else {
let mut root_store = RootCertStore::empty();
root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
OwnedTrustAnchor::from_subject_spki_name_constraints(
ta.subject,
ta.spki,
ta.name_constraints,
)
}));
RustlsConfig::builder()
.with_safe_defaults()
.with_root_certificates(root_store)
.with_no_client_auth()
};
let connector = TlsConnector::from(Arc::new(tls_config));
let server_name = ServerName::try_from(config.server.as_str())
.map_err(|e| ElectrumError::TlsError(format!("Invalid server name: {}", e)))?;
let tls_stream = tokio::time::timeout(config.timeout, connector.connect(server_name, stream))
.await
.map_err(|_| ElectrumError::Timeout)?
.map_err(|e| ElectrumError::TlsError(e.to_string()))?;
Ok(Connection::Tls(Box::new(BufReader::new(tls_stream))))
}
pub async fn request(
&self,
id: u64,
method: &str,
params: Vec<serde_json::Value>,
) -> Result<serde_json::Value> {
let request = JsonRpcRequest::new(id, method, params);
let request_json = serde_json::to_string(&request)?;
let mut conn = self.connection.lock().await;
let request_line = format!("{}\n", request_json);
match &mut *conn {
Connection::Tcp(reader) => {
reader.get_mut().write_all(request_line.as_bytes()).await?;
reader.get_mut().flush().await?;
}
Connection::Tls(reader) => {
reader.get_mut().write_all(request_line.as_bytes()).await?;
reader.get_mut().flush().await?;
}
}
let mut response_line = String::new();
let bytes_read = tokio::time::timeout(self.config.timeout, async {
match &mut *conn {
Connection::Tcp(reader) => reader.read_line(&mut response_line).await,
Connection::Tls(reader) => reader.read_line(&mut response_line).await,
}
})
.await
.map_err(|_| ElectrumError::Timeout)??;
if bytes_read == 0 {
return Err(ElectrumError::Disconnected);
}
let response: JsonRpcResponse = serde_json::from_str(&response_line)?;
if let Some(error) = response.error {
return Err(ElectrumError::ServerError {
code: error.code,
message: error.message,
});
}
if let Some(resp_id) = response.id {
if resp_id != id {
return Err(ElectrumError::IdMismatch {
expected: id,
got: resp_id,
});
}
}
Ok(response.result.unwrap_or(serde_json::Value::Null))
}
pub async fn batch_request(
&self,
requests: Vec<(u64, &str, Vec<serde_json::Value>)>,
) -> Result<Vec<serde_json::Value>> {
if requests.is_empty() {
return Ok(vec![]);
}
let batch: Vec<JsonRpcRequest> = requests
.iter()
.map(|(id, method, params)| JsonRpcRequest::new(*id, method, params.clone()))
.collect();
let request_json = serde_json::to_string(&batch)?;
let mut conn = self.connection.lock().await;
let request_line = format!("{}\n", request_json);
match &mut *conn {
Connection::Tcp(reader) => {
reader.get_mut().write_all(request_line.as_bytes()).await?;
reader.get_mut().flush().await?;
}
Connection::Tls(reader) => {
reader.get_mut().write_all(request_line.as_bytes()).await?;
reader.get_mut().flush().await?;
}
}
let mut response_line = String::new();
let bytes_read = tokio::time::timeout(self.config.timeout, async {
match &mut *conn {
Connection::Tcp(reader) => reader.read_line(&mut response_line).await,
Connection::Tls(reader) => reader.read_line(&mut response_line).await,
}
})
.await
.map_err(|_| ElectrumError::Timeout)??;
if bytes_read == 0 {
return Err(ElectrumError::Disconnected);
}
let responses: Vec<JsonRpcResponse> = serde_json::from_str(&response_line)?;
let mut results = Vec::with_capacity(responses.len());
for response in responses {
if let Some(error) = response.error {
return Err(ElectrumError::ServerError {
code: error.code,
message: error.message,
});
}
results.push(
response
.result
.ok_or_else(|| ElectrumError::InvalidResponse("Missing result".into()))?,
);
}
Ok(results)
}
pub fn config(&self) -> &ClientConfig {
&self.config
}
}