opt-in-miner 0.4.1

Opt-in Monero/Wownero mining library for transparent application monetization
Documentation
mod request;
mod response;

use request::{KeepalivedParams, LoginParams, Request, SubmitParams};
use response::{JobNotification, LoginResult, Response, StatusResult};

use std::{
    io::{Read, Write},
    net::TcpStream,
    sync::{
        Arc,
        mpsc::{self, Receiver, Sender},
    },
    thread,
};

use rustls::{
    ClientConfig, ClientConnection, StreamOwned,
    client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
    pki_types::{CertificateDer, ServerName, UnixTime},
};

use crate::job::Job;

#[derive(Debug, thiserror::Error)]
pub enum Error {
    #[error("{0}")]
    Io(#[from] std::io::Error),
    #[error("{0}")]
    Json(#[from] serde_json::Error),
    #[error("login failed: {0}")]
    LoginFailed(String),
    #[error("pool error: {code} – {message}")]
    Pool { code: i64, message: String },
    #[error("TLS error: {0}")]
    Tls(#[from] rustls::Error),
}

type TlsStream = StreamOwned<ClientConnection, TcpStream>;

pub struct PoolSource {
    write_sender: Sender<Vec<u8>>,
    id: String,
    job_receiver: Receiver<Job>,
    #[allow(dead_code)]
    response_receiver: Receiver<Response<StatusResult>>,
    request_id: u64,
}

impl PoolSource {
    pub fn login(url: &str, wallet: &str, pass: &str) -> Result<(Self, Job), Error> {
        let (host, _port) = parse_host_port(url);
        let mut stream = connect_tls(url, &host)?;

        let login_request = Request {
            id: 1u64,
            method: "login",
            params: LoginParams::new(wallet, pass),
        };
        send(&mut stream, &login_request)?;
        let response: Response<LoginResult> = recv(&mut stream)?;

        if let Some(error) = response.error {
            return Err(Error::Pool {
                code: error.code,
                message: error.message,
            });
        }

        let result = response
            .result
            .ok_or_else(|| Error::LoginFailed("no result in response".into()))?;
        let id = result.id;
        let initial_job = convert_job(&result.job);

        let (job_sender, job_receiver) = mpsc::channel();
        let (response_sender, response_receiver) = mpsc::channel();
        let (write_sender, write_receiver) = mpsc::channel::<Vec<u8>>();

        thread::spawn(move || io_thread(stream, write_receiver, job_sender, response_sender));

        Ok((
            Self {
                write_sender,
                id,
                job_receiver,
                response_receiver,
                request_id: 2,
            },
            initial_job,
        ))
    }

    pub fn submit(&mut self, job_id: &str, nonce: &str, hash: &str) -> Result<(), Error> {
        let request = Request {
            id: self.next_id(),
            method: "submit",
            params: SubmitParams::new(&self.id, job_id, nonce, hash),
        };
        self.send_via_channel(&request)
    }

    pub fn keepalive(&mut self) -> Result<(), Error> {
        let request = Request {
            id: self.next_id(),
            method: "keepalived",
            params: KeepalivedParams::new(&self.id),
        };
        self.send_via_channel(&request)
    }

    pub fn try_receive_job(&self) -> Option<Job> {
        self.job_receiver.try_recv().ok()
    }

    fn send_via_channel(&self, request: &Request<impl serde::Serialize>) -> Result<(), Error> {
        let mut data = serde_json::to_vec(request)?;
        data.push(b'\n');
        self.write_sender
            .send(data)
            .map_err(|_| std::io::Error::other("io thread gone"))?;
        Ok(())
    }

    fn next_id(&mut self) -> u64 {
        let id = self.request_id;
        self.request_id += 1;
        id
    }
}

fn io_thread(
    mut stream: TlsStream,
    write_receiver: Receiver<Vec<u8>>,
    job_sender: Sender<Job>,
    response_sender: Sender<Response<StatusResult>>,
) {
    stream
        .sock
        .set_read_timeout(Some(std::time::Duration::from_millis(100)))
        .ok();

    let mut read_buffer = Vec::new();

    loop {
        while let Ok(data) = write_receiver.try_recv() {
            if stream.write_all(&data).is_err() || stream.flush().is_err() {
                return;
            }
        }

        let mut buf = [0u8; 4096];
        match stream.read(&mut buf) {
            Ok(0) => return,
            Ok(n) => read_buffer.extend_from_slice(&buf[..n]),
            Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => {}
            Err(ref error) if error.kind() == std::io::ErrorKind::TimedOut => {}
            Err(_) => return,
        }

        while let Some(newline_position) = read_buffer.iter().position(|&b| b == b'\n') {
            let line: String = read_buffer
                .drain(..=newline_position)
                .map(|b| b as char)
                .collect();
            let line = line.trim();
            if line.is_empty() {
                continue;
            }
            if let Ok(notification) = serde_json::from_str::<JobNotification>(line) {
                let _ = job_sender.send(convert_job(&notification.params));
                continue;
            }
            if let Ok(job_response) = serde_json::from_str::<Response<LoginResult>>(line)
                && let Some(result) = job_response.result
            {
                let _ = job_sender.send(convert_job(&result.job));
                continue;
            }
            if let Ok(response) = serde_json::from_str::<Response<StatusResult>>(line) {
                let _ = response_sender.send(response);
            }
        }
    }
}

fn connect_tls(address: &str, host: &str) -> Result<TlsStream, Error> {
    let tcp = TcpStream::connect(address)?;
    let config = Arc::new(
        ClientConfig::builder()
            .dangerous()
            .with_custom_certificate_verifier(Arc::new(NoVerification))
            .with_no_client_auth(),
    );
    let server_name = host
        .to_string()
        .try_into()
        .map_err(|_| std::io::Error::other("invalid server name"))?;
    let connection = ClientConnection::new(config, server_name)?;
    Ok(StreamOwned::new(connection, tcp))
}

#[derive(Debug)]
struct NoVerification;

impl ServerCertVerifier for NoVerification {
    fn verify_server_cert(
        &self,
        _end_entity: &CertificateDer<'_>,
        _intermediates: &[CertificateDer<'_>],
        _server_name: &ServerName<'_>,
        _ocsp_response: &[u8],
        _now: UnixTime,
    ) -> Result<ServerCertVerified, rustls::Error> {
        Ok(ServerCertVerified::assertion())
    }

    fn verify_tls12_signature(
        &self,
        _message: &[u8],
        _cert: &CertificateDer<'_>,
        _dss: &rustls::DigitallySignedStruct,
    ) -> Result<HandshakeSignatureValid, rustls::Error> {
        Ok(HandshakeSignatureValid::assertion())
    }

    fn verify_tls13_signature(
        &self,
        _message: &[u8],
        _cert: &CertificateDer<'_>,
        _dss: &rustls::DigitallySignedStruct,
    ) -> Result<HandshakeSignatureValid, rustls::Error> {
        Ok(HandshakeSignatureValid::assertion())
    }

    fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
        rustls::crypto::aws_lc_rs::default_provider()
            .signature_verification_algorithms
            .supported_schemes()
    }
}

fn parse_host_port(address: &str) -> (String, u16) {
    if let Some((host, port_str)) = address.rsplit_once(':')
        && let Ok(port) = port_str.parse()
    {
        return (host.into(), port);
    }
    (address.into(), 3333)
}

fn convert_job(raw: &response::RawJob) -> Job {
    let target_bytes = hex::decode(&raw.target).unwrap_or_default();
    let threshold = match target_bytes.len() {
        4 => {
            let mut buf = [0u8; 4];
            buf.copy_from_slice(&target_bytes);
            (u32::from_le_bytes(buf) as u64) << 32
        }
        8 => {
            let mut buf = [0u8; 8];
            buf.copy_from_slice(&target_bytes);
            u64::from_le_bytes(buf)
        }
        _ => 1,
    };

    Job {
        id: raw.id.clone(),
        hashing_blob: hex::decode(&raw.blob).unwrap_or_default(),
        seed: hex::decode(&raw.seed).unwrap_or_default(),
        threshold,
        template_blob: None,
    }
}

fn send(stream: &mut TlsStream, request: &Request<impl serde::Serialize>) -> Result<(), Error> {
    let mut data = serde_json::to_vec(request)?;
    data.push(b'\n');
    stream.write_all(&data)?;
    stream.flush()?;
    Ok(())
}

fn recv<R: serde::de::DeserializeOwned>(stream: &mut TlsStream) -> Result<Response<R>, Error> {
    let mut line = String::new();
    let mut buf = [0u8; 1];
    loop {
        stream.read_exact(&mut buf)?;
        if buf[0] == b'\n' {
            break;
        }
        line.push(buf[0] as char);
    }
    Ok(serde_json::from_str(&line)?)
}