electrum-client-netagnostic 0.21.2

Electrum client library that is network agnostic. Supports plaintext, TLS, WebSocket and Onion servers.
Documentation
//! WebSocket wrapper for Electrum protocol
//!
//! This module provides a wrapper around tungstenite's WebSocket that implements
//! `Read` and `Write` traits, adapting the message-based WebSocket protocol to
//! the stream-based interface expected by `RawClient`.

use std::io::{self, Read, Write};

use tungstenite::{Message, WebSocket};

/// A wrapper around a WebSocket connection that implements `Read` and `Write`.
///
/// The Electrum protocol over WebSocket sends each JSON-RPC message as a text frame.
/// This wrapper:
/// - On write: Buffers bytes until a newline, then sends as a WebSocket text message
/// - On read: Receives WebSocket messages and appends newlines for `BufReader::read_line` compatibility
#[derive(Debug)]
pub struct WebSocketWrapper<S> {
    socket: WebSocket<S>,
    /// Buffer for incoming data (messages received but not yet read)
    read_buffer: Vec<u8>,
    /// Buffer for outgoing data (partial writes before newline)
    write_buffer: Vec<u8>,
}

impl<S> WebSocketWrapper<S> {
    /// Create a new WebSocketWrapper from an existing WebSocket connection
    pub fn new(socket: WebSocket<S>) -> Self {
        Self {
            socket,
            read_buffer: Vec::new(),
            write_buffer: Vec::new(),
        }
    }

    /// Get a reference to the underlying WebSocket
    pub fn get_ref(&self) -> &WebSocket<S> {
        &self.socket
    }

    /// Get a mutable reference to the underlying WebSocket
    pub fn get_mut(&mut self) -> &mut WebSocket<S> {
        &mut self.socket
    }
}

impl<S: Read + Write> Read for WebSocketWrapper<S> {
    fn read(&mut self, buf: &mut [u8]) -> io::Result<usize> {
        // If we have buffered data, return from that first
        if !self.read_buffer.is_empty() {
            let len = std::cmp::min(buf.len(), self.read_buffer.len());
            buf[..len].copy_from_slice(&self.read_buffer[..len]);
            self.read_buffer.drain(..len);
            return Ok(len);
        }

        // Otherwise, read the next WebSocket message
        loop {
            match self.socket.read() {
                Ok(Message::Text(text)) => {
                    // Convert to bytes and append newline for BufReader::read_line compatibility
                    let text_bytes: &[u8] = text.as_ref();
                    let mut data = text_bytes.to_vec();
                    if !data.ends_with(b"\n") {
                        data.push(b'\n');
                    }

                    let len = std::cmp::min(buf.len(), data.len());
                    buf[..len].copy_from_slice(&data[..len]);

                    // Buffer any remaining data
                    if data.len() > len {
                        self.read_buffer.extend_from_slice(&data[len..]);
                    }

                    return Ok(len);
                }
                Ok(Message::Binary(data)) => {
                    // Handle binary messages the same way
                    let data_bytes: &[u8] = data.as_ref();
                    let mut data = data_bytes.to_vec();
                    if !data.ends_with(b"\n") {
                        data.push(b'\n');
                    }

                    let len = std::cmp::min(buf.len(), data.len());
                    buf[..len].copy_from_slice(&data[..len]);

                    if data.len() > len {
                        self.read_buffer.extend_from_slice(&data[len..]);
                    }

                    return Ok(len);
                }
                Ok(Message::Ping(data)) => {
                    // Respond to ping with pong and continue reading
                    let _ = self.socket.write(Message::Pong(data));
                    let _ = self.socket.flush();
                    continue;
                }
                Ok(Message::Pong(_)) => {
                    // Ignore pong messages, continue reading
                    continue;
                }
                Ok(Message::Close(_)) => {
                    // Connection closed
                    return Ok(0);
                }
                Ok(Message::Frame(_)) => {
                    // Raw frame, ignore and continue
                    continue;
                }
                Err(tungstenite::Error::Io(e)) => {
                    return Err(e);
                }
                Err(e) => {
                    return Err(io::Error::new(io::ErrorKind::Other, e.to_string()));
                }
            }
        }
    }
}

impl<S: Read + Write> Write for WebSocketWrapper<S> {
    fn write(&mut self, buf: &[u8]) -> io::Result<usize> {
        // Buffer the data
        self.write_buffer.extend_from_slice(buf);

        // Process ALL complete messages (ending with newline) in the buffer
        while let Some(newline_pos) = self.write_buffer.iter().position(|&b| b == b'\n') {
            // Extract the message (without the newline)
            let message: Vec<u8> = self.write_buffer.drain(..=newline_pos).collect();
            let message_str = String::from_utf8_lossy(&message[..message.len() - 1]);

            // Send as WebSocket text message
            self.socket
                .write(Message::Text(message_str.into_owned().into()))
                .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
        }

        Ok(buf.len())
    }

    fn flush(&mut self) -> io::Result<()> {
        // If there's any remaining data in the write buffer without a newline,
        // send it anyway (this handles edge cases)
        if !self.write_buffer.is_empty() {
            let message = std::mem::take(&mut self.write_buffer);
            let message_str = String::from_utf8_lossy(&message);

            self.socket
                .write(Message::Text(message_str.into_owned().into()))
                .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))?;
        }

        self.socket
            .flush()
            .map_err(|e| io::Error::new(io::ErrorKind::Other, e.to_string()))
    }
}