rustywallet_electrum/
transport.rs1use std::sync::Arc;
6
7use rustls::{
8 client::{ServerCertVerified, ServerCertVerifier},
9 Certificate, ClientConfig as RustlsConfig, OwnedTrustAnchor, RootCertStore, ServerName,
10};
11use serde::{Deserialize, Serialize};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
13use tokio::net::TcpStream;
14use tokio::sync::Mutex;
15use tokio_rustls::client::TlsStream;
16use tokio_rustls::TlsConnector;
17
18use crate::error::{ElectrumError, Result};
19use crate::types::ClientConfig;
20
21struct NoVerifier;
23
24impl ServerCertVerifier for NoVerifier {
25 fn verify_server_cert(
26 &self,
27 _end_entity: &Certificate,
28 _intermediates: &[Certificate],
29 _server_name: &ServerName,
30 _scts: &mut dyn Iterator<Item = &[u8]>,
31 _ocsp_response: &[u8],
32 _now: std::time::SystemTime,
33 ) -> std::result::Result<ServerCertVerified, rustls::Error> {
34 Ok(ServerCertVerified::assertion())
35 }
36}
37
38#[derive(Debug, Serialize)]
40pub struct JsonRpcRequest<'a> {
41 pub jsonrpc: &'static str,
43 pub id: u64,
45 pub method: &'a str,
47 pub params: Vec<serde_json::Value>,
49}
50
51impl<'a> JsonRpcRequest<'a> {
52 pub fn new(id: u64, method: &'a str, params: Vec<serde_json::Value>) -> Self {
54 Self {
55 jsonrpc: "2.0",
56 id,
57 method,
58 params,
59 }
60 }
61}
62
63#[derive(Debug, Deserialize)]
65pub struct JsonRpcResponse {
66 pub jsonrpc: String,
68 pub id: Option<u64>,
70 pub result: Option<serde_json::Value>,
72 pub error: Option<JsonRpcError>,
74}
75
76#[derive(Debug, Deserialize)]
78pub struct JsonRpcError {
79 pub code: i32,
81 pub message: String,
83}
84
85enum Connection {
87 Tcp(BufReader<TcpStream>),
88 Tls(Box<BufReader<TlsStream<TcpStream>>>),
89}
90
91pub struct Transport {
93 connection: Mutex<Connection>,
94 config: ClientConfig,
95}
96
97impl Transport {
98 pub async fn connect(config: ClientConfig) -> Result<Self> {
100 let connection = if config.use_tls {
101 Self::connect_tls(&config).await?
102 } else {
103 Self::connect_tcp(&config).await?
104 };
105
106 Ok(Self {
107 connection: Mutex::new(connection),
108 config,
109 })
110 }
111
112 async fn connect_tcp(config: &ClientConfig) -> Result<Connection> {
114 let addr = config.address();
115 let stream = tokio::time::timeout(config.timeout, TcpStream::connect(&addr))
116 .await
117 .map_err(|_| ElectrumError::Timeout)?
118 .map_err(|e| ElectrumError::ConnectionFailed(format!("{}: {}", addr, e)))?;
119
120 stream.set_nodelay(true)?;
121 Ok(Connection::Tcp(BufReader::new(stream)))
122 }
123
124 async fn connect_tls(config: &ClientConfig) -> Result<Connection> {
126 let addr = config.address();
127
128 let stream = tokio::time::timeout(config.timeout, TcpStream::connect(&addr))
130 .await
131 .map_err(|_| ElectrumError::Timeout)?
132 .map_err(|e| ElectrumError::ConnectionFailed(format!("{}: {}", addr, e)))?;
133
134 stream.set_nodelay(true)?;
135
136 let tls_config = if config.skip_tls_verify {
138 RustlsConfig::builder()
140 .with_safe_defaults()
141 .with_custom_certificate_verifier(Arc::new(NoVerifier))
142 .with_no_client_auth()
143 } else {
144 let mut root_store = RootCertStore::empty();
146 root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
147 OwnedTrustAnchor::from_subject_spki_name_constraints(
148 ta.subject,
149 ta.spki,
150 ta.name_constraints,
151 )
152 }));
153
154 RustlsConfig::builder()
155 .with_safe_defaults()
156 .with_root_certificates(root_store)
157 .with_no_client_auth()
158 };
159
160 let connector = TlsConnector::from(Arc::new(tls_config));
161
162 let server_name = ServerName::try_from(config.server.as_str())
163 .map_err(|e| ElectrumError::TlsError(format!("Invalid server name: {}", e)))?;
164
165 let tls_stream = tokio::time::timeout(config.timeout, connector.connect(server_name, stream))
166 .await
167 .map_err(|_| ElectrumError::Timeout)?
168 .map_err(|e| ElectrumError::TlsError(e.to_string()))?;
169
170 Ok(Connection::Tls(Box::new(BufReader::new(tls_stream))))
171 }
172
173 pub async fn request(
175 &self,
176 id: u64,
177 method: &str,
178 params: Vec<serde_json::Value>,
179 ) -> Result<serde_json::Value> {
180 let request = JsonRpcRequest::new(id, method, params);
181 let request_json = serde_json::to_string(&request)?;
182
183 let mut conn = self.connection.lock().await;
184
185 let request_line = format!("{}\n", request_json);
187
188 match &mut *conn {
189 Connection::Tcp(reader) => {
190 reader.get_mut().write_all(request_line.as_bytes()).await?;
191 reader.get_mut().flush().await?;
192 }
193 Connection::Tls(reader) => {
194 reader.get_mut().write_all(request_line.as_bytes()).await?;
195 reader.get_mut().flush().await?;
196 }
197 }
198
199 let mut response_line = String::new();
201
202 let bytes_read = tokio::time::timeout(self.config.timeout, async {
203 match &mut *conn {
204 Connection::Tcp(reader) => reader.read_line(&mut response_line).await,
205 Connection::Tls(reader) => reader.read_line(&mut response_line).await,
206 }
207 })
208 .await
209 .map_err(|_| ElectrumError::Timeout)??;
210
211 if bytes_read == 0 {
212 return Err(ElectrumError::Disconnected);
213 }
214
215 let response: JsonRpcResponse = serde_json::from_str(&response_line)?;
217
218 if let Some(error) = response.error {
220 return Err(ElectrumError::ServerError {
221 code: error.code,
222 message: error.message,
223 });
224 }
225
226 if let Some(resp_id) = response.id {
228 if resp_id != id {
229 return Err(ElectrumError::IdMismatch {
230 expected: id,
231 got: resp_id,
232 });
233 }
234 }
235
236 Ok(response.result.unwrap_or(serde_json::Value::Null))
238 }
239
240 pub async fn batch_request(
242 &self,
243 requests: Vec<(u64, &str, Vec<serde_json::Value>)>,
244 ) -> Result<Vec<serde_json::Value>> {
245 if requests.is_empty() {
246 return Ok(vec![]);
247 }
248
249 let batch: Vec<JsonRpcRequest> = requests
250 .iter()
251 .map(|(id, method, params)| JsonRpcRequest::new(*id, method, params.clone()))
252 .collect();
253
254 let request_json = serde_json::to_string(&batch)?;
255
256 let mut conn = self.connection.lock().await;
257
258 let request_line = format!("{}\n", request_json);
260
261 match &mut *conn {
262 Connection::Tcp(reader) => {
263 reader.get_mut().write_all(request_line.as_bytes()).await?;
264 reader.get_mut().flush().await?;
265 }
266 Connection::Tls(reader) => {
267 reader.get_mut().write_all(request_line.as_bytes()).await?;
268 reader.get_mut().flush().await?;
269 }
270 }
271
272 let mut response_line = String::new();
274
275 let bytes_read = tokio::time::timeout(self.config.timeout, async {
276 match &mut *conn {
277 Connection::Tcp(reader) => reader.read_line(&mut response_line).await,
278 Connection::Tls(reader) => reader.read_line(&mut response_line).await,
279 }
280 })
281 .await
282 .map_err(|_| ElectrumError::Timeout)??;
283
284 if bytes_read == 0 {
285 return Err(ElectrumError::Disconnected);
286 }
287
288 let responses: Vec<JsonRpcResponse> = serde_json::from_str(&response_line)?;
290
291 let mut results = Vec::with_capacity(responses.len());
293 for response in responses {
294 if let Some(error) = response.error {
295 return Err(ElectrumError::ServerError {
296 code: error.code,
297 message: error.message,
298 });
299 }
300 results.push(
301 response
302 .result
303 .ok_or_else(|| ElectrumError::InvalidResponse("Missing result".into()))?,
304 );
305 }
306
307 Ok(results)
308 }
309
310 pub fn config(&self) -> &ClientConfig {
312 &self.config
313 }
314}