mod request;
mod response;
use request::{KeepalivedParams, LoginParams, Request, SubmitParams};
use response::{JobNotification, LoginResult, Response, StatusResult};
use std::{
io::{Read, Write},
net::TcpStream,
sync::{
Arc,
mpsc::{self, Receiver, Sender},
},
thread,
};
use rustls::{
ClientConfig, ClientConnection, StreamOwned,
client::danger::{HandshakeSignatureValid, ServerCertVerified, ServerCertVerifier},
pki_types::{CertificateDer, ServerName, UnixTime},
};
use crate::job::Job;
#[derive(Debug, thiserror::Error)]
pub enum Error {
#[error("{0}")]
Io(#[from] std::io::Error),
#[error("{0}")]
Json(#[from] serde_json::Error),
#[error("login failed: {0}")]
LoginFailed(String),
#[error("pool error: {code} – {message}")]
Pool { code: i64, message: String },
#[error("TLS error: {0}")]
Tls(#[from] rustls::Error),
}
type TlsStream = StreamOwned<ClientConnection, TcpStream>;
pub struct PoolSource {
write_sender: Sender<Vec<u8>>,
id: String,
job_receiver: Receiver<Job>,
#[allow(dead_code)]
response_receiver: Receiver<Response<StatusResult>>,
request_id: u64,
}
impl PoolSource {
pub fn login(url: &str, wallet: &str, pass: &str) -> Result<(Self, Job), Error> {
let (host, _port) = parse_host_port(url);
let mut stream = connect_tls(url, &host)?;
let login_request = Request {
id: 1u64,
method: "login",
params: LoginParams::new(wallet, pass),
};
send(&mut stream, &login_request)?;
let response: Response<LoginResult> = recv(&mut stream)?;
if let Some(error) = response.error {
return Err(Error::Pool {
code: error.code,
message: error.message,
});
}
let result = response
.result
.ok_or_else(|| Error::LoginFailed("no result in response".into()))?;
let id = result.id;
let initial_job = convert_job(&result.job);
let (job_sender, job_receiver) = mpsc::channel();
let (response_sender, response_receiver) = mpsc::channel();
let (write_sender, write_receiver) = mpsc::channel::<Vec<u8>>();
thread::spawn(move || io_thread(stream, write_receiver, job_sender, response_sender));
Ok((
Self {
write_sender,
id,
job_receiver,
response_receiver,
request_id: 2,
},
initial_job,
))
}
pub fn submit(&mut self, job_id: &str, nonce: &str, hash: &str) -> Result<(), Error> {
let request = Request {
id: self.next_id(),
method: "submit",
params: SubmitParams::new(&self.id, job_id, nonce, hash),
};
self.send_via_channel(&request)
}
pub fn keepalive(&mut self) -> Result<(), Error> {
let request = Request {
id: self.next_id(),
method: "keepalived",
params: KeepalivedParams::new(&self.id),
};
self.send_via_channel(&request)
}
pub fn try_receive_job(&self) -> Option<Job> {
self.job_receiver.try_recv().ok()
}
fn send_via_channel(&self, request: &Request<impl serde::Serialize>) -> Result<(), Error> {
let mut data = serde_json::to_vec(request)?;
data.push(b'\n');
self.write_sender
.send(data)
.map_err(|_| std::io::Error::other("io thread gone"))?;
Ok(())
}
fn next_id(&mut self) -> u64 {
let id = self.request_id;
self.request_id += 1;
id
}
}
fn io_thread(
mut stream: TlsStream,
write_receiver: Receiver<Vec<u8>>,
job_sender: Sender<Job>,
response_sender: Sender<Response<StatusResult>>,
) {
stream
.sock
.set_read_timeout(Some(std::time::Duration::from_millis(100)))
.ok();
let mut read_buffer = Vec::new();
loop {
while let Ok(data) = write_receiver.try_recv() {
if stream.write_all(&data).is_err() || stream.flush().is_err() {
return;
}
}
let mut buf = [0u8; 4096];
match stream.read(&mut buf) {
Ok(0) => return,
Ok(n) => read_buffer.extend_from_slice(&buf[..n]),
Err(ref error) if error.kind() == std::io::ErrorKind::WouldBlock => {}
Err(ref error) if error.kind() == std::io::ErrorKind::TimedOut => {}
Err(_) => return,
}
while let Some(newline_position) = read_buffer.iter().position(|&b| b == b'\n') {
let line: String = read_buffer
.drain(..=newline_position)
.map(|b| b as char)
.collect();
let line = line.trim();
if line.is_empty() {
continue;
}
if let Ok(notification) = serde_json::from_str::<JobNotification>(line) {
let _ = job_sender.send(convert_job(¬ification.params));
continue;
}
if let Ok(job_response) = serde_json::from_str::<Response<LoginResult>>(line)
&& let Some(result) = job_response.result
{
let _ = job_sender.send(convert_job(&result.job));
continue;
}
if let Ok(response) = serde_json::from_str::<Response<StatusResult>>(line) {
let _ = response_sender.send(response);
}
}
}
}
fn connect_tls(address: &str, host: &str) -> Result<TlsStream, Error> {
let tcp = TcpStream::connect(address)?;
let config = Arc::new(
ClientConfig::builder()
.dangerous()
.with_custom_certificate_verifier(Arc::new(NoVerification))
.with_no_client_auth(),
);
let server_name = host
.to_string()
.try_into()
.map_err(|_| std::io::Error::other("invalid server name"))?;
let connection = ClientConnection::new(config, server_name)?;
Ok(StreamOwned::new(connection, tcp))
}
#[derive(Debug)]
struct NoVerification;
impl ServerCertVerifier for NoVerification {
fn verify_server_cert(
&self,
_end_entity: &CertificateDer<'_>,
_intermediates: &[CertificateDer<'_>],
_server_name: &ServerName<'_>,
_ocsp_response: &[u8],
_now: UnixTime,
) -> Result<ServerCertVerified, rustls::Error> {
Ok(ServerCertVerified::assertion())
}
fn verify_tls12_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn verify_tls13_signature(
&self,
_message: &[u8],
_cert: &CertificateDer<'_>,
_dss: &rustls::DigitallySignedStruct,
) -> Result<HandshakeSignatureValid, rustls::Error> {
Ok(HandshakeSignatureValid::assertion())
}
fn supported_verify_schemes(&self) -> Vec<rustls::SignatureScheme> {
rustls::crypto::aws_lc_rs::default_provider()
.signature_verification_algorithms
.supported_schemes()
}
}
fn parse_host_port(address: &str) -> (String, u16) {
if let Some((host, port_str)) = address.rsplit_once(':')
&& let Ok(port) = port_str.parse()
{
return (host.into(), port);
}
(address.into(), 3333)
}
fn convert_job(raw: &response::RawJob) -> Job {
let target_bytes = hex::decode(&raw.target).unwrap_or_default();
let threshold = match target_bytes.len() {
4 => {
let mut buf = [0u8; 4];
buf.copy_from_slice(&target_bytes);
(u32::from_le_bytes(buf) as u64) << 32
}
8 => {
let mut buf = [0u8; 8];
buf.copy_from_slice(&target_bytes);
u64::from_le_bytes(buf)
}
_ => 1,
};
Job {
id: raw.id.clone(),
hashing_blob: hex::decode(&raw.blob).unwrap_or_default(),
seed: hex::decode(&raw.seed).unwrap_or_default(),
threshold,
template_blob: None,
}
}
fn send(stream: &mut TlsStream, request: &Request<impl serde::Serialize>) -> Result<(), Error> {
let mut data = serde_json::to_vec(request)?;
data.push(b'\n');
stream.write_all(&data)?;
stream.flush()?;
Ok(())
}
fn recv<R: serde::de::DeserializeOwned>(stream: &mut TlsStream) -> Result<Response<R>, Error> {
let mut line = String::new();
let mut buf = [0u8; 1];
loop {
stream.read_exact(&mut buf)?;
if buf[0] == b'\n' {
break;
}
line.push(buf[0] as char);
}
Ok(serde_json::from_str(&line)?)
}