use crate::server::VarDiffConfig;
use crate::{Result, ID};
use async_std::net::SocketAddr;
use async_std::sync::{Arc, Mutex, RwLock};
use chrono::{Duration, NaiveDateTime, Utc};
use encodings::{FromHex, ToHex};
use extended_primitives::Buffer;
use futures::channel::mpsc::UnboundedSender;
use futures::SinkExt;
use log::{debug, info, warn};
use rand::prelude::*;
use serde::Serialize;
use std::net::IpAddr;
use std::time::SystemTime;
use uuid::Uuid;
#[derive(PartialEq, Debug)]
pub enum ConnectionState {
Connected,
Disconnect,
}
#[derive(Debug)]
pub struct Connection<State> {
pub sender: Arc<Mutex<UnboundedSender<String>>>,
pub authorized: Arc<Mutex<bool>>,
pub subscribed: Arc<Mutex<bool>>,
pub session_start: SystemTime,
pub connection_state: Mutex<ConnectionState>,
pub difficulty: Arc<Mutex<f64>>,
pub stats: Arc<Mutex<MinerStats>>,
pub needs_ban: Arc<Mutex<bool>>,
pub next_difficulty: Arc<Mutex<Option<f64>>>,
pub id: Uuid,
pub subscriber_id: Arc<Mutex<String>>,
pub miner_info: Arc<RwLock<MinerInfo>>,
pub job_stats: Arc<Mutex<JobStats>>,
pub options: Arc<MinerOptions>,
pub var_diff: Arc<Mutex<bool>>,
pub last_message_id: Arc<Mutex<ID>>,
pub worker_info: Arc<Mutex<WorkerInfo>>,
pub state: Arc<Mutex<State>>,
}
#[derive(Clone, Debug)]
pub struct MinerInfo {
pub ip: IpAddr,
pub auth: Option<MinerAuth>,
pub id: Option<Uuid>,
pub sid: Option<String>,
pub job_stats: Option<MinerJobStats>,
pub name: Option<String>,
}
#[derive(Clone, Debug)]
pub struct MinerAuth {
pub id: String,
pub username: String,
pub client: String,
}
#[derive(Clone, Debug)]
pub struct MinerJobStats {
pub expected_difficulty: f64,
}
#[derive(Debug)]
pub struct JobStats {
last_timestamp: i64,
last_share_timestamp: i64,
last_retarget: i64,
times: Vec<i64>,
current_difficulty: f64,
job_difficulty: f64,
}
#[derive(Debug, Default)]
pub struct MinerOptions {
retarget_time: i64,
target_time: f64,
min_diff: f64,
max_diff: f64,
max_delta: f64,
variance_percent: f64,
}
#[derive(Debug, Clone)]
pub struct MinerStats {
accepted_shares: u64,
rejected_shares: u64,
last_active: NaiveDateTime,
}
#[derive(Debug, Clone, Default)]
pub struct WorkerInfo {
pub client: Option<String>,
pub name: Option<String>,
pub sid: Option<Buffer>,
pub account_id: i32,
pub id: Uuid,
}
impl<State: Clone + Send + Sync + 'static> Connection<State> {
pub fn new(
addr: SocketAddr,
sender: UnboundedSender<String>,
initial_difficulty: f64,
var_diff_config: VarDiffConfig,
state: State,
) -> Self {
let id = Uuid::new_v4();
info!("Accepting new miner. ID: {}", &id);
let info = MinerInfo {
ip: addr.ip(),
auth: None,
id: None,
sid: None,
job_stats: None,
name: None,
};
let options = MinerOptions {
retarget_time: var_diff_config.retarget_time,
target_time: var_diff_config.target_time,
min_diff: var_diff_config.minimum_difficulty,
max_diff: var_diff_config.maximum_difficulty,
max_delta: 1.0,
variance_percent: var_diff_config.variance_percent,
};
let stats = MinerStats {
accepted_shares: 0,
rejected_shares: 0,
last_active: Utc::now().naive_utc(),
};
let job_stats = JobStats {
last_share_timestamp: 0,
last_timestamp: Utc::now().naive_utc().timestamp(),
last_retarget: Utc::now().naive_utc().timestamp() - options.retarget_time / 2,
times: Vec::new(),
current_difficulty: initial_difficulty,
job_difficulty: 0.0,
};
Connection {
id,
sender: Arc::new(Mutex::new(sender)),
authorized: Arc::new(Mutex::new(false)),
session_start: SystemTime::now(),
connection_state: Mutex::new(ConnectionState::Connected),
subscribed: Arc::new(Mutex::new(false)),
difficulty: Arc::new(Mutex::new(initial_difficulty)),
subscriber_id: Arc::new(Mutex::new(String::new())),
miner_info: Arc::new(RwLock::new(info)),
next_difficulty: Arc::new(Mutex::new(None)),
job_stats: Arc::new(Mutex::new(job_stats)),
options: Arc::new(options),
stats: Arc::new(Mutex::new(stats)),
var_diff: Arc::new(Mutex::new(var_diff_config.var_diff)),
needs_ban: Arc::new(Mutex::new(false)),
last_message_id: Arc::new(Mutex::new(ID::Str(String::from("")))),
worker_info: Arc::new(Mutex::new(Default::default())),
state: Arc::new(Mutex::new(state)),
}
}
pub async fn info(&self) -> MinerInfo {
self.miner_info.read().await.clone()
}
pub async fn is_disconnected(&self) -> bool {
*self.connection_state.lock().await == ConnectionState::Disconnect
}
pub async fn send<T: Serialize>(&self, message: T) -> Result<()> {
let last_active = self.stats.lock().await.last_active;
let last_active_ago = Utc::now().naive_utc() - last_active;
if last_active_ago > Duration::seconds(600) {
warn!(
"Miner: {} not active since {}. Disconnecting",
self.id, last_active
);
self.ban().await;
return Ok(());
}
let msg_string = serde_json::to_string(&message)?;
debug!("Sending message: {}", msg_string.clone());
let mut sender = self.sender.lock().await;
sender.send(msg_string).await?;
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
*self.connection_state.lock().await = ConnectionState::Disconnect;
Ok(())
}
async fn retarget(&self) {
let now = Utc::now().naive_utc().timestamp();
let mut job_stats = self.job_stats.lock().await;
job_stats.current_difficulty = *self.difficulty.lock().await;
let since_last = now - job_stats.last_timestamp;
job_stats.times.push(since_last);
job_stats.last_timestamp = now;
if now - job_stats.last_retarget < self.options.retarget_time {
return;
}
let variance = self.options.target_time * (self.options.variance_percent as f64 / 100.0);
let time_min = self.options.target_time - variance;
let time_max = self.options.target_time + variance;
job_stats.last_retarget = now;
let avg: i64 = job_stats.times.iter().sum::<i64>() / job_stats.times.len() as i64;
let mut d_dif = self.options.target_time / avg as f64;
if avg as f64 > time_max && job_stats.current_difficulty > self.options.min_diff {
if d_dif * job_stats.current_difficulty < self.options.min_diff {
d_dif = self.options.min_diff / job_stats.current_difficulty;
}
} else if (avg as f64) < time_min {
if d_dif * job_stats.current_difficulty > self.options.max_diff {
d_dif = self.options.max_diff / job_stats.current_difficulty;
}
} else {
return;
}
let new_diff = job_stats.current_difficulty * d_dif;
*self.next_difficulty.lock().await = Some(new_diff);
job_stats.times.clear();
}
pub async fn disconnect(&self) {
*self.connection_state.lock().await = ConnectionState::Disconnect;
}
pub async fn ban(&self) {
*self.needs_ban.lock().await = true;
self.disconnect().await;
}
pub async fn needs_ban(&self) -> bool {
*self.needs_ban.lock().await
}
pub async fn get_stats(&self) -> MinerStats {
self.stats.lock().await.clone()
}
pub async fn ip(&self) -> IpAddr {
self.info().await.ip
}
pub fn id(&self) -> Uuid {
self.id
}
pub async fn set_sid(&self, sid: Option<String>, sid_size: usize) -> String {
let sid = match sid {
Some(sid) => Buffer::from_hex(sid).unwrap(),
None => {
let mut id = vec![0; sid_size];
let mut rng = rand::thread_rng();
rng.fill_bytes(&mut id);
Buffer::from(id)
}
};
self.worker_info.lock().await.sid = Some(sid.clone());
sid.to_hex()
}
pub async fn set_var_diff(&self, var_diff: bool) {
*self.var_diff.lock().await = var_diff;
}
pub async fn set_worker_name(&self, name: Option<String>) {
self.worker_info.lock().await.name = name;
}
pub async fn set_account_id(&self, id: i32) {
self.worker_info.lock().await.account_id = id;
}
pub async fn get_account_id(&self) -> i32 {
self.worker_info.lock().await.account_id
}
pub async fn set_client(&self, client: &str) {
self.worker_info.lock().await.client = Some(client.to_owned());
}
pub async fn get_client(&self) -> Option<String> {
self.worker_info.lock().await.client.clone()
}
pub async fn set_worker_id(&self, id: Uuid) {
self.worker_info.lock().await.id = id;
}
pub async fn get_sid(&self) -> Option<Buffer> {
self.worker_info.lock().await.sid.clone()
}
pub async fn get_worker(&self) -> WorkerInfo {
self.worker_info.lock().await.clone()
}
pub async fn authorized(&self) -> bool {
*self.authorized.lock().await
}
pub async fn authorize(&self) {
*self.authorized.lock().await = true;
}
pub async fn subscribed(&self) -> bool {
*self.subscribed.lock().await
}
pub async fn subscribe(&self) {
*self.subscribed.lock().await = true;
}
pub async fn set_difficulty(&self, difficulty: f64) {
*self.difficulty.lock().await = difficulty;
}
pub async fn get_difficulty(&self) -> f64 {
*self.difficulty.lock().await
}
pub async fn get_state(&self) -> State {
self.state.lock().await.clone()
}
pub async fn set_state(&self, state: State) {
*self.state.lock().await = state;
}
pub async fn valid_share(&self) {
self.stats.lock().await.accepted_shares += 1;
self.consider_ban().await;
if *self.var_diff.lock().await {
self.retarget().await;
}
}
pub async fn update_difficulty(&self) -> Option<f64> {
let next_difficulty = *self.next_difficulty.lock().await;
if let Some(next_difficulty) = next_difficulty {
*self.difficulty.lock().await = next_difficulty;
*self.next_difficulty.lock().await = None;
Some(next_difficulty)
} else {
None
}
}
pub async fn invalid_share(&self) {
self.stats.lock().await.rejected_shares += 1;
self.consider_ban().await;
}
pub async fn consider_ban(&self) {
let accepted = self.stats.lock().await.accepted_shares;
let rejected = self.stats.lock().await.rejected_shares;
let total = accepted + rejected;
let check_threshold = 500;
let invalid_percent = 50.0;
if total >= check_threshold {
let percent_bad: f64 = (rejected as f64 / total as f64) * 100.0;
if percent_bad < invalid_percent {
} else {
warn!(
"Miner: {} banned. {} out of the last {} shares were invalid",
self.id, rejected, total
);
self.ban().await;
}
}
}
}