rustywallet_electrum/
transport.rs

1//! Transport layer for Electrum protocol communication.
2//!
3//! Handles TCP and TLS connections with JSON-RPC framing.
4
5use std::sync::Arc;
6
7use rustls::{
8    client::{ServerCertVerified, ServerCertVerifier},
9    Certificate, ClientConfig as RustlsConfig, OwnedTrustAnchor, RootCertStore, ServerName,
10};
11use serde::{Deserialize, Serialize};
12use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
13use tokio::net::TcpStream;
14use tokio::sync::Mutex;
15use tokio_rustls::client::TlsStream;
16use tokio_rustls::TlsConnector;
17
18use crate::error::{ElectrumError, Result};
19use crate::types::ClientConfig;
20
21/// Dummy certificate verifier that accepts any certificate (INSECURE).
22struct NoVerifier;
23
24impl ServerCertVerifier for NoVerifier {
25    fn verify_server_cert(
26        &self,
27        _end_entity: &Certificate,
28        _intermediates: &[Certificate],
29        _server_name: &ServerName,
30        _scts: &mut dyn Iterator<Item = &[u8]>,
31        _ocsp_response: &[u8],
32        _now: std::time::SystemTime,
33    ) -> std::result::Result<ServerCertVerified, rustls::Error> {
34        Ok(ServerCertVerified::assertion())
35    }
36}
37
38/// JSON-RPC request structure.
39#[derive(Debug, Serialize)]
40pub struct JsonRpcRequest<'a> {
41    /// JSON-RPC version (always "2.0")
42    pub jsonrpc: &'static str,
43    /// Request ID for matching responses
44    pub id: u64,
45    /// Method name to call
46    pub method: &'a str,
47    /// Method parameters
48    pub params: Vec<serde_json::Value>,
49}
50
51impl<'a> JsonRpcRequest<'a> {
52    /// Create a new JSON-RPC request.
53    pub fn new(id: u64, method: &'a str, params: Vec<serde_json::Value>) -> Self {
54        Self {
55            jsonrpc: "2.0",
56            id,
57            method,
58            params,
59        }
60    }
61}
62
63/// JSON-RPC response structure.
64#[derive(Debug, Deserialize)]
65pub struct JsonRpcResponse {
66    /// JSON-RPC version
67    pub jsonrpc: String,
68    /// Response ID matching request
69    pub id: Option<u64>,
70    /// Result value (if successful)
71    pub result: Option<serde_json::Value>,
72    /// Error (if failed)
73    pub error: Option<JsonRpcError>,
74}
75
76/// JSON-RPC error structure.
77#[derive(Debug, Deserialize)]
78pub struct JsonRpcError {
79    /// Error code
80    pub code: i32,
81    /// Error message
82    pub message: String,
83}
84
85/// Transport connection type.
86enum Connection {
87    Tcp(BufReader<TcpStream>),
88    Tls(Box<BufReader<TlsStream<TcpStream>>>),
89}
90
91/// Transport layer for Electrum server communication.
92pub struct Transport {
93    connection: Mutex<Connection>,
94    config: ClientConfig,
95}
96
97impl Transport {
98    /// Create a new transport connection.
99    pub async fn connect(config: ClientConfig) -> Result<Self> {
100        let connection = if config.use_tls {
101            Self::connect_tls(&config).await?
102        } else {
103            Self::connect_tcp(&config).await?
104        };
105
106        Ok(Self {
107            connection: Mutex::new(connection),
108            config,
109        })
110    }
111
112    /// Connect via plain TCP.
113    async fn connect_tcp(config: &ClientConfig) -> Result<Connection> {
114        let addr = config.address();
115        let stream = tokio::time::timeout(config.timeout, TcpStream::connect(&addr))
116            .await
117            .map_err(|_| ElectrumError::Timeout)?
118            .map_err(|e| ElectrumError::ConnectionFailed(format!("{}: {}", addr, e)))?;
119
120        stream.set_nodelay(true)?;
121        Ok(Connection::Tcp(BufReader::new(stream)))
122    }
123
124    /// Connect via TLS/SSL.
125    async fn connect_tls(config: &ClientConfig) -> Result<Connection> {
126        let addr = config.address();
127
128        // Create TCP connection first
129        let stream = tokio::time::timeout(config.timeout, TcpStream::connect(&addr))
130            .await
131            .map_err(|_| ElectrumError::Timeout)?
132            .map_err(|e| ElectrumError::ConnectionFailed(format!("{}: {}", addr, e)))?;
133
134        stream.set_nodelay(true)?;
135
136        // Setup TLS config
137        let tls_config = if config.skip_tls_verify {
138            // INSECURE: Skip certificate validation
139            RustlsConfig::builder()
140                .with_safe_defaults()
141                .with_custom_certificate_verifier(Arc::new(NoVerifier))
142                .with_no_client_auth()
143        } else {
144            // Normal: Use webpki roots for validation
145            let mut root_store = RootCertStore::empty();
146            root_store.add_trust_anchors(webpki_roots::TLS_SERVER_ROOTS.iter().map(|ta| {
147                OwnedTrustAnchor::from_subject_spki_name_constraints(
148                    ta.subject,
149                    ta.spki,
150                    ta.name_constraints,
151                )
152            }));
153
154            RustlsConfig::builder()
155                .with_safe_defaults()
156                .with_root_certificates(root_store)
157                .with_no_client_auth()
158        };
159
160        let connector = TlsConnector::from(Arc::new(tls_config));
161
162        let server_name = ServerName::try_from(config.server.as_str())
163            .map_err(|e| ElectrumError::TlsError(format!("Invalid server name: {}", e)))?;
164
165        let tls_stream = tokio::time::timeout(config.timeout, connector.connect(server_name, stream))
166            .await
167            .map_err(|_| ElectrumError::Timeout)?
168            .map_err(|e| ElectrumError::TlsError(e.to_string()))?;
169
170        Ok(Connection::Tls(Box::new(BufReader::new(tls_stream))))
171    }
172
173    /// Send a JSON-RPC request and receive response.
174    pub async fn request(
175        &self,
176        id: u64,
177        method: &str,
178        params: Vec<serde_json::Value>,
179    ) -> Result<serde_json::Value> {
180        let request = JsonRpcRequest::new(id, method, params);
181        let request_json = serde_json::to_string(&request)?;
182
183        let mut conn = self.connection.lock().await;
184
185        // Send request with newline delimiter
186        let request_line = format!("{}\n", request_json);
187
188        match &mut *conn {
189            Connection::Tcp(reader) => {
190                reader.get_mut().write_all(request_line.as_bytes()).await?;
191                reader.get_mut().flush().await?;
192            }
193            Connection::Tls(reader) => {
194                reader.get_mut().write_all(request_line.as_bytes()).await?;
195                reader.get_mut().flush().await?;
196            }
197        }
198
199        // Read response line
200        let mut response_line = String::new();
201
202        let bytes_read = tokio::time::timeout(self.config.timeout, async {
203            match &mut *conn {
204                Connection::Tcp(reader) => reader.read_line(&mut response_line).await,
205                Connection::Tls(reader) => reader.read_line(&mut response_line).await,
206            }
207        })
208        .await
209        .map_err(|_| ElectrumError::Timeout)??;
210
211        if bytes_read == 0 {
212            return Err(ElectrumError::Disconnected);
213        }
214
215        // Parse response
216        let response: JsonRpcResponse = serde_json::from_str(&response_line)?;
217
218        // Check for errors
219        if let Some(error) = response.error {
220            return Err(ElectrumError::ServerError {
221                code: error.code,
222                message: error.message,
223            });
224        }
225
226        // Verify ID matches
227        if let Some(resp_id) = response.id {
228            if resp_id != id {
229                return Err(ElectrumError::IdMismatch {
230                    expected: id,
231                    got: resp_id,
232                });
233            }
234        }
235
236        // Return result (null is valid for some methods like ping)
237        Ok(response.result.unwrap_or(serde_json::Value::Null))
238    }
239
240    /// Send a batch of JSON-RPC requests.
241    pub async fn batch_request(
242        &self,
243        requests: Vec<(u64, &str, Vec<serde_json::Value>)>,
244    ) -> Result<Vec<serde_json::Value>> {
245        if requests.is_empty() {
246            return Ok(vec![]);
247        }
248
249        let batch: Vec<JsonRpcRequest> = requests
250            .iter()
251            .map(|(id, method, params)| JsonRpcRequest::new(*id, method, params.clone()))
252            .collect();
253
254        let request_json = serde_json::to_string(&batch)?;
255
256        let mut conn = self.connection.lock().await;
257
258        // Send batch request
259        let request_line = format!("{}\n", request_json);
260
261        match &mut *conn {
262            Connection::Tcp(reader) => {
263                reader.get_mut().write_all(request_line.as_bytes()).await?;
264                reader.get_mut().flush().await?;
265            }
266            Connection::Tls(reader) => {
267                reader.get_mut().write_all(request_line.as_bytes()).await?;
268                reader.get_mut().flush().await?;
269            }
270        }
271
272        // Read response
273        let mut response_line = String::new();
274
275        let bytes_read = tokio::time::timeout(self.config.timeout, async {
276            match &mut *conn {
277                Connection::Tcp(reader) => reader.read_line(&mut response_line).await,
278                Connection::Tls(reader) => reader.read_line(&mut response_line).await,
279            }
280        })
281        .await
282        .map_err(|_| ElectrumError::Timeout)??;
283
284        if bytes_read == 0 {
285            return Err(ElectrumError::Disconnected);
286        }
287
288        // Parse batch response
289        let responses: Vec<JsonRpcResponse> = serde_json::from_str(&response_line)?;
290
291        // Extract results in order
292        let mut results = Vec::with_capacity(responses.len());
293        for response in responses {
294            if let Some(error) = response.error {
295                return Err(ElectrumError::ServerError {
296                    code: error.code,
297                    message: error.message,
298                });
299            }
300            results.push(
301                response
302                    .result
303                    .ok_or_else(|| ElectrumError::InvalidResponse("Missing result".into()))?,
304            );
305        }
306
307        Ok(results)
308    }
309
310    /// Get the current configuration.
311    pub fn config(&self) -> &ClientConfig {
312        &self.config
313    }
314}