use crate::router::Router;
use async_std::net::{SocketAddr, TcpStream};
use async_std::sync::{Arc, Mutex, RwLock};
use chrono::{NaiveDateTime, Utc};
use futures::io::BufReader;
use futures::io::{AsyncBufReadExt, AsyncWriteExt};
use futures::io::{ReadHalf, WriteHalf};
use log::{debug, info};
use serde::Serialize;
use serde_json::{Map, Value};
use std::time::SystemTime;
use stratum_types::params::{Params, Results, SetDiff};
use stratum_types::Result;
use stratum_types::{
Error, MinerInfo, MinerJobStats, Request, Response, StratumError, StratumMethod, ID,
};
use uuid::Uuid;
#[derive(PartialEq, Debug)]
pub enum ConnectionState {
Connected,
Disconnect,
}
#[derive(Debug)]
pub struct Connection {
pub id: ID,
pub write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
pub read_half: Arc<Mutex<BufReader<ReadHalf<TcpStream>>>>,
pub authorized: Arc<Mutex<bool>>,
pub session_start: SystemTime,
pub connection_state: Mutex<ConnectionState>,
pub subscribed: Arc<Mutex<bool>>,
pub subscriber_id: Arc<Mutex<String>>,
pub miner_info: Arc<RwLock<MinerInfo>>,
pub difficulty: Arc<Mutex<f64>>,
pub submissions: Arc<Mutex<u64>>,
pub last_retarget: Arc<Mutex<SystemTime>>,
pub next_difficulty: Arc<Mutex<f64>>,
pub job_stats: Arc<Mutex<JobStats>>,
pub options: Arc<MinerOptions>,
pub stats: Arc<Mutex<MinerStats>>,
pub needs_ban: Arc<Mutex<bool>>,
pub var_diff: bool,
pub ban_stats: Arc<Mutex<BanStats>>,
pub classic: Arc<Mutex<bool>>,
pub last_message_id: Arc<Mutex<ID>>,
}
#[derive(Debug, Default)]
pub struct JobStats {
last_share_timestamp: i64,
last_retarget: i64,
times: Vec<i64>,
current_difficulty: f64,
job_difficulty: f64,
}
#[derive(Debug, Default)]
pub struct MinerOptions {
retarget_time: i64,
target_time: f64,
min_diff: f64,
max_delta: f64,
variance_percent: u32,
share_time_min: f64,
share_time_max: f64,
}
#[derive(Debug, Clone)]
pub struct MinerStats {
accepted_shares: u64,
rejected_shares: u64,
last_active: NaiveDateTime,
}
#[derive(Debug)]
pub struct BanStats {
accepted_shares: u64,
rejected_shares: u64,
last_active: NaiveDateTime,
}
impl Connection {
pub fn new(
addr: SocketAddr,
rh: BufReader<ReadHalf<TcpStream>>,
wh: WriteHalf<TcpStream>,
var_diff: bool,
initial_difficulty: f64,
) -> Self {
let id = Uuid::new_v4().to_string();
info!("Accepting new miner. ID: {}", &id);
let info = MinerInfo {
ip: addr.ip(),
auth: None,
id: None,
sid: None,
job_stats: None,
worker_name: None,
};
let options = MinerOptions {
retarget_time: 120,
target_time: 6.0,
min_diff: 0.0001,
max_delta: 1.0,
variance_percent: 30,
share_time_min: 4.2,
share_time_max: 7.8,
};
let stats = MinerStats {
accepted_shares: 0,
rejected_shares: 0,
last_active: Utc::now().naive_utc(),
};
let ban_stats = BanStats {
accepted_shares: 0,
rejected_shares: 0,
last_active: Utc::now().naive_utc(),
};
Connection {
id: ID::Str(id),
write_half: Arc::new(Mutex::new(wh)),
read_half: Arc::new(Mutex::new(rh)),
authorized: Arc::new(Mutex::new(false)),
session_start: SystemTime::now(),
connection_state: Mutex::new(ConnectionState::Connected),
subscribed: Arc::new(Mutex::new(false)),
difficulty: Arc::new(Mutex::new(initial_difficulty)),
subscriber_id: Arc::new(Mutex::new(String::new())),
miner_info: Arc::new(RwLock::new(info)),
submissions: Arc::new(Mutex::new(0)),
last_retarget: Arc::new(Mutex::new(SystemTime::now())),
next_difficulty: Arc::new(Mutex::new(0.0)),
job_stats: Arc::new(Mutex::new(Default::default())),
options: Arc::new(options),
stats: Arc::new(Mutex::new(stats)),
var_diff,
needs_ban: Arc::new(Mutex::new(false)),
ban_stats: Arc::new(Mutex::new(ban_stats)),
classic: Arc::new(Mutex::new(false)),
last_message_id: Arc::new(Mutex::new(ID::Str(String::from("")))),
}
}
pub async fn info(&self) -> MinerInfo {
self.miner_info.read().await.clone()
}
pub async fn is_disconnected(&self) -> bool {
*self.connection_state.lock().await == ConnectionState::Disconnect
}
pub async fn next_message(
&self,
) -> Result<(String, serde_json::map::Map<String, serde_json::Value>)> {
loop {
let mut stream = self.read_half.lock().await;
let mut buf = String::new();
let num_bytes = stream.read_line(&mut buf).await?;
if num_bytes == 0 {
self.shutdown().await?;
return Err(Error::StreamClosed);
}
if !buf.is_empty() {
buf = buf.trim().to_owned();
debug!("Received Message: {}", &buf);
dbg!(&buf);
let msg: Map<String, Value> = serde_json::from_str(&buf)?;
let method = if msg.contains_key("method") {
match msg.get("method") {
Some(method) => method.as_str(),
None => return Err(Error::MethodDoesntExist),
}
} else if msg.contains_key("messsage") {
match msg.get("message") {
Some(method) => method.as_str(),
None => return Err(Error::MethodDoesntExist),
}
} else {
return Err(Error::MethodDoesntExist);
};
if let Some(method_string) = method {
return Ok((method_string.to_owned(), msg));
} else {
return Err(Error::MethodDoesntExist);
}
};
}
}
async fn send<T: Serialize>(&self, message: T) -> Result<()> {
let msg = serde_json::to_vec(&message)?;
let msg_string = serde_json::to_string(&message)?;
debug!("Sending message: {}", msg_string);
let mut stream = self.write_half.lock().await;
stream.write_all(&msg).await?;
stream.write_all(b"\n").await?;
Ok(())
}
pub async fn shutdown(&self) -> Result<()> {
*self.connection_state.lock().await = ConnectionState::Disconnect;
Ok(())
}
async fn retarget(&self, avg: f64, stats: &mut JobStats) -> Result<()> {
let mut new_difficulty = stats.current_difficulty * (self.options.target_time / avg);
let delta = (new_difficulty - stats.current_difficulty).abs();
if delta > self.options.max_delta {
if new_difficulty > stats.current_difficulty {
new_difficulty = new_difficulty - (delta - self.options.max_delta);
} else if new_difficulty < stats.current_difficulty {
new_difficulty = new_difficulty + (delta - self.options.max_delta);
}
}
if new_difficulty < self.options.min_diff {
new_difficulty = self.options.min_diff;
} else if new_difficulty > stats.job_difficulty {
new_difficulty = stats.job_difficulty;
}
if new_difficulty < stats.current_difficulty || new_difficulty > stats.current_difficulty {
stats.last_retarget = Utc::now().timestamp();
stats.times = Vec::new();
stats.current_difficulty = new_difficulty;
let job_stats = MinerJobStats {
expected_difficulty: new_difficulty,
};
self.miner_info.write().await.job_stats = Some(job_stats);
}
Ok(())
}
pub async fn handle_unknown(&self, _msg: &serde_json::Value) -> Result<()> {
Ok(())
}
pub async fn disconnect(&self) {
*self.connection_state.lock().await = ConnectionState::Disconnect;
}
pub async fn ban(&self) {
*self.needs_ban.lock().await = true;
}
pub async fn needs_ban(&self) -> bool {
*self.needs_ban.lock().await
}
pub async fn get_stats(&self) -> MinerStats {
self.stats.lock().await.clone()
}
}