use async_std::net::{SocketAddr, TcpStream};
use async_std::sync::{Arc, Mutex, RwLock};
use chrono::Utc;
use futures::io::BufReader;
use futures::io::{AsyncBufReadExt, AsyncWriteExt};
use futures::io::{ReadHalf, WriteHalf};
use log::info;
use serde::Serialize;
use std::time::SystemTime;
use stratum_types::params::{ClientParam, PoolParam};
use stratum_types::traits::{
AuthManager, Authorize, AuthorizeResult, BlockValidator, DataProvider, Notify, StratumManager,
SubscribeResult,
};
use stratum_types::traits::{PoolParams, StratumParams};
use stratum_types::Result;
use stratum_types::{
ClientPacket, Error, MinerAuth, MinerInfo, MinerJobStats, Request, Response, StratumError,
StratumMethod,
};
use uuid::Uuid;
#[derive(PartialEq, Debug)]
pub enum State {
Connected,
Disconnect,
}
#[derive(Debug)]
pub struct Connection<SM: StratumManager> {
pub id: String,
pub write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
pub read_half: Arc<Mutex<BufReader<ReadHalf<TcpStream>>>>,
pub authorized: Arc<Mutex<bool>>,
pub data_provider: Arc<SM::DataProvider>,
pub block_validator: Arc<SM::BlockValidator>,
pub auth_manager: Arc<SM::AuthManager>,
pub session_start: SystemTime,
pub state: Mutex<State>,
pub subscribed: Arc<Mutex<bool>>,
pub subscriber_id: Arc<Mutex<String>>,
pub miner_info: Arc<RwLock<MinerInfo>>,
pub difficulty: Arc<Mutex<f64>>,
pub submissions: Arc<Mutex<u64>>,
pub last_retarget: Arc<Mutex<SystemTime>>,
pub next_difficulty: Arc<Mutex<f64>>,
pub job_stats: Arc<Mutex<JobStats>>,
pub options: Arc<MinerOptions>,
pub stats: Arc<Mutex<MinerStats>>,
pub var_diff: bool,
}
#[derive(Debug, Default)]
pub struct JobStats {
last_share_timestamp: i64,
last_retarget: i64,
times: Vec<i64>,
current_difficulty: f64,
job_difficulty: f64,
}
#[derive(Debug, Default)]
pub struct MinerOptions {
retarget_time: i64,
target_time: f64,
min_diff: f64,
max_delta: f64,
variance_percent: u32,
share_time_min: f64,
share_time_max: f64,
}
#[derive(Debug, Default)]
pub struct MinerStats {
effective_hash_rate: f64,
accepted_shares: u64,
rejected_shares: u64,
}
impl<SM> Connection<SM>
where
SM: StratumManager,
{
pub fn new(
addr: SocketAddr,
rh: BufReader<ReadHalf<TcpStream>>,
wh: WriteHalf<TcpStream>,
data_provider: Arc<SM::DataProvider>,
auth_manager: Arc<SM::AuthManager>,
block_validator: Arc<SM::BlockValidator>,
var_diff: bool,
initial_difficulty: f64,
) -> Self {
let id = Uuid::new_v4().to_string();
info!("Accepting new miner. ID: {}", &id);
let info = MinerInfo {
ip: addr.ip(),
auth: None,
sid: None,
job_stats: None,
};
let options = MinerOptions {
retarget_time: 120,
target_time: 6.0,
min_diff: 0.0001,
max_delta: 1.0,
variance_percent: 30,
share_time_min: 4.2,
share_time_max: 7.8,
};
Connection {
id,
write_half: Arc::new(Mutex::new(wh)),
read_half: Arc::new(Mutex::new(rh)),
authorized: Arc::new(Mutex::new(false)),
data_provider,
auth_manager,
block_validator,
session_start: SystemTime::now(),
state: Mutex::new(State::Connected),
subscribed: Arc::new(Mutex::new(false)),
difficulty: Arc::new(Mutex::new(initial_difficulty)),
subscriber_id: Arc::new(Mutex::new(String::new())),
miner_info: Arc::new(RwLock::new(info)),
submissions: Arc::new(Mutex::new(0)),
last_retarget: Arc::new(Mutex::new(SystemTime::now())),
next_difficulty: Arc::new(Mutex::new(0.0)),
job_stats: Arc::new(Mutex::new(Default::default())),
options: Arc::new(options),
stats: Arc::new(Mutex::new(Default::default())),
var_diff,
}
}
pub async fn start(&self) -> Result<()> {
loop {
if *self.state.lock().await == State::Disconnect {
break;
}
let msg: ClientPacket<SM::PoolParams, SM::StratumParams> = self.next_message().await?;
match msg {
ClientPacket::Request(req) => self.handle_requests(req).await?,
ClientPacket::Response(res) => self.handle_responses(res).await?,
};
}
Ok(())
}
pub async fn handle_requests(
&self,
req: Request<ClientParam<SM::PoolParams, SM::StratumParams>>,
) -> Result<()> {
if let ClientParam::Authorize(auth) = &req.params {
return Ok(self.handle_authorize(auth).await?);
}
match &req.params {
ClientParam::Submit(value) => self.handle_submit(value).await?,
ClientParam::Subscribe(value) => self.handle_subscribe(value).await?,
ClientParam::Unknown(value) => self.handle_unknown(value).await?,
ClientParam::Authorize(_) => {}
};
Ok(())
}
pub async fn handle_responses(
&self,
_res: Response<ClientParam<SM::PoolParams, SM::StratumParams>>,
) -> Result<()> {
let authorized = self.authorized.lock().await;
if !*authorized {
return Ok(());
}
Ok(())
}
pub async fn next_message(&self) -> Result<ClientPacket<SM::PoolParams, SM::StratumParams>> {
loop {
let mut stream = self.read_half.lock().await;
let mut buf = String::new();
let num_bytes = stream.read_line(&mut buf).await?;
if num_bytes == 0 {
self.shutdown().await?;
return Err(Error::StreamClosed);
}
if !buf.is_empty() {
buf = buf.trim().to_owned();
let msg: ClientPacket<SM::PoolParams, SM::StratumParams> =
serde_json::from_str(&buf)?;
return Ok(msg);
};
}
}
async fn send<T: Serialize>(&self, message: T) -> Result<()> {
let msg = serde_json::to_vec(&message)?;
let mut stream = self.write_half.lock().await;
stream.write_all(&msg).await?;
stream.write_all(b"\n").await?;
Ok(())
}
async fn send_request(
&self,
method: StratumMethod,
params: PoolParam<SM::PoolParams, SM::StratumParams>,
) -> Result<()> {
let request = Request {
id: self.id.clone(),
method,
params,
};
Ok(self.send(request).await?)
}
async fn send_response(
&self,
method: StratumMethod,
result: PoolParam<SM::PoolParams, SM::StratumParams>,
) -> Result<()> {
let response = Response {
id: self.id.clone(),
method,
result: Some(result),
error: None,
};
Ok(self.send(response).await?)
}
async fn send_error(&self, method: StratumMethod, error: StratumError) -> Result<()> {
let response: Response<PoolParam<SM::PoolParams, SM::StratumParams>> = Response {
id: self.id.clone(),
method,
result: None,
error: Some(error),
};
Ok(self.send(response).await?)
}
pub async fn shutdown(&self) -> Result<()> {
*self.state.lock().await = State::Disconnect;
Ok(())
}
pub async fn handle_authorize(
&self,
auth: &<SM::PoolParams as PoolParams>::Authorize,
) -> Result<()> {
let result = self
.auth_manager
.authorize(self.miner_info.read().await.clone(), &auth)
.await;
match result {
Ok(auth_result) => {
self.send_response(
StratumMethod::Authorize,
PoolParam::AuthorizeResult(auth_result.clone()),
)
.await?;
if auth_result.authorized() {
info!("Authorized Miner: {}", &self.id);
*self.authorized.lock().await = true;
let miner_auth = MinerAuth {
username: auth.username(),
id: auth_result.id(),
};
self.miner_info.write().await.auth = Some(miner_auth);
} else {
self.shutdown().await?;
}
}
Err(e) => {
self.send_error(StratumMethod::Authorize, e).await?;
}
}
Ok(())
}
pub async fn send_initial_work(&self) -> Result<()> {
let job = self.data_provider.get_job().await;
let mut job_stats = self.job_stats.lock().await;
job_stats.last_share_timestamp = Utc::now().timestamp();
job_stats.last_retarget = Utc::now().timestamp();
job_stats.times = Vec::new();
job_stats.job_difficulty = job.get_difficulty();
job_stats.current_difficulty = *self.difficulty.lock().await;
let info_job_stats = MinerJobStats {
expected_difficulty: job_stats.current_difficulty,
};
self.miner_info.write().await.job_stats = Some(info_job_stats);
self.send_request(StratumMethod::Notify, PoolParam::Notify(job))
.await?;
Ok(())
}
pub async fn send_work(&self) -> Result<()> {
let job = self.data_provider.get_job().await;
let job_difficulty = job.get_difficulty();
self.send_request(StratumMethod::Notify, PoolParam::Notify(job))
.await?;
let mut job_stats = self.job_stats.lock().await;
job_stats.last_share_timestamp = Utc::now().timestamp();
job_stats.last_retarget = Utc::now().timestamp();
job_stats.times = Vec::new();
job_stats.job_difficulty = job_difficulty;
job_stats.current_difficulty = *self.difficulty.lock().await;
let info_job_stats = MinerJobStats {
expected_difficulty: job_stats.current_difficulty,
};
self.miner_info.write().await.job_stats = Some(info_job_stats);
Ok(())
}
pub async fn handle_submit(
&self,
share: &<SM::StratumParams as StratumParams>::Submit,
) -> Result<()> {
info!("Received share from miner: {}", &self.id);
let result = self
.block_validator
.validate_share(self.miner_info.read().await.clone(), share.clone())
.await;
match result {
Ok(valid) => {
info!("Accepted share from miner: {}", &self.id);
self.send_response(StratumMethod::Submit, PoolParam::SubmitResult(valid))
.await?;
if self.var_diff {
let mut job_stats = self.job_stats.lock().await;
let now = Utc::now().timestamp();
let duration_since_last_share = now - job_stats.last_share_timestamp;
job_stats.times.push(duration_since_last_share);
job_stats.last_share_timestamp = now;
let time_total: i64 = job_stats.times.iter().sum();
let avg = time_total as f64 / job_stats.times.len() as f64;
if now - job_stats.last_retarget >= self.options.retarget_time
|| avg < self.options.share_time_min && avg > self.options.share_time_max
{
self.retarget(avg, &mut job_stats).await?
}
}
}
Err(e) => {
info!("Rejecting share from miner: {}. Reason: {}", &self.id, e);
self.send_error(StratumMethod::Submit, e).await?;
}
}
Ok(())
}
pub async fn handle_subscribe(
&self,
subscribe: &<SM::PoolParams as PoolParams>::Subscribe,
) -> Result<()> {
let result = self
.auth_manager
.subscribe(self.miner_info.read().await.clone(), subscribe)
.await;
match result {
Ok(sub_info) => {
*self.subscriber_id.lock().await = sub_info.id();
*self.subscribed.lock().await = true;
self.miner_info.write().await.sid = Some(sub_info.id());
self.send_response(
StratumMethod::Subscribe,
PoolParam::SubscribeResult(sub_info),
)
.await?;
self.set_difficulty(*self.difficulty.lock().await).await?;
self.send_initial_work().await?;
}
Err(e) => {
self.send_error(StratumMethod::Subscribe, e).await?;
}
}
Ok(())
}
async fn set_difficulty(&self, difficulty: f64) -> Result<()> {
self.send_request(
StratumMethod::SetDifficulty,
PoolParam::SetDifficulty(difficulty),
)
.await?;
Ok(())
}
async fn retarget(&self, avg: f64, stats: &mut JobStats) -> Result<()> {
let mut new_difficulty = stats.current_difficulty * (self.options.target_time / avg);
let delta = (new_difficulty - stats.current_difficulty).abs();
if delta > self.options.max_delta {
if new_difficulty > stats.current_difficulty {
new_difficulty = new_difficulty - (delta - self.options.max_delta);
} else if new_difficulty < stats.current_difficulty {
new_difficulty = new_difficulty + (delta - self.options.max_delta);
}
}
if new_difficulty < self.options.min_diff {
new_difficulty = self.options.min_diff;
} else if new_difficulty > stats.job_difficulty {
new_difficulty = stats.job_difficulty;
}
if new_difficulty < stats.current_difficulty || new_difficulty > stats.current_difficulty {
stats.last_retarget = Utc::now().timestamp();
stats.times = Vec::new();
stats.current_difficulty = new_difficulty;
let job_stats = MinerJobStats {
expected_difficulty: new_difficulty,
};
self.miner_info.write().await.job_stats = Some(job_stats);
self.set_difficulty(stats.current_difficulty).await?;
}
Ok(())
}
pub async fn handle_unknown(&self, _msg: &serde_json::Value) -> Result<()> {
Ok(())
}
}