use async_std::net::TcpStream;
use async_std::sync::{Arc, Mutex};
use futures::io::BufReader;
use futures::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
use futures::io::{ReadHalf, WriteHalf};
use log::info;
use serde::Serialize;
use std::time::SystemTime;
use stratum_types::params::{ClientParam, PoolParam};
use stratum_types::traits::{
AuthManager, BlockValidator, DataProvider, StratumManager, SubscribeResult,
};
use stratum_types::traits::{PoolParams, StratumParams};
use stratum_types::Result;
use stratum_types::{
ClientPacket, ClientRequest, ClientResponse, MinerInfo, PoolRequest, PoolResponse,
StratumMethod,
};
use uuid::Uuid;
#[derive(PartialEq, Debug)]
pub enum State {
Connected,
Disconnect,
}
#[derive(Debug)]
pub struct Connection<SM: StratumManager> {
pub id: String,
pub write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
pub read_half: Arc<Mutex<BufReader<ReadHalf<TcpStream>>>>,
pub authorized: Arc<Mutex<bool>>,
pub data_provider: Arc<SM::DataProvider>,
pub block_validator: Arc<SM::BlockValidator>,
pub auth_manager: Arc<SM::AuthManager>,
pub session_start: SystemTime,
pub state: Mutex<State>,
pub subscribed: Arc<Mutex<bool>>,
pub difficulty: Arc<Mutex<f64>>,
pub subscriber_id: Arc<Mutex<String>>,
}
impl<SM> Connection<SM>
where
SM: StratumManager,
{
pub fn new(
rh: BufReader<ReadHalf<TcpStream>>,
wh: WriteHalf<TcpStream>,
data_provider: Arc<SM::DataProvider>,
auth_manager: Arc<SM::AuthManager>,
block_validator: Arc<SM::BlockValidator>,
) -> Self {
let id = Uuid::new_v4().to_string();
info!("Accepting new miner. ID: {}", &id);
Connection {
id,
write_half: Arc::new(Mutex::new(wh)),
read_half: Arc::new(Mutex::new(rh)),
authorized: Arc::new(Mutex::new(false)),
data_provider,
auth_manager,
block_validator,
session_start: SystemTime::now(),
state: Mutex::new(State::Connected),
subscribed: Arc::new(Mutex::new(false)),
difficulty: Arc::new(Mutex::new(0.0)),
subscriber_id: Arc::new(Mutex::new(String::new())),
}
}
pub async fn start(&self) -> Result<()> {
loop {
if *self.state.lock().await == State::Disconnect {
break;
}
let msg: ClientPacket<SM::PoolParams, SM::StratumParams> = self.next_message().await?;
match msg {
ClientPacket::Request(req) => self.handle_requests(req).await?,
ClientPacket::Response(res) => self.handle_responses(res).await?,
};
}
Ok(())
}
pub async fn handle_requests(
&self,
req: ClientRequest<SM::PoolParams, SM::StratumParams>,
) -> Result<()> {
if let ClientParam::Authorize(auth) = &req.params {
return Ok(self.handle_authorize(auth).await?);
}
let authorized = self.authorized.lock().await;
if !*authorized {
return Ok(());
}
match &req.params {
ClientParam::Submit(value) => self.handle_submit(value).await?,
ClientParam::Subscribe(value) => self.handle_subscribe(value).await?,
ClientParam::Unknown(value) => self.handle_unknown(value).await?,
ClientParam::Authorize(_) => {}
};
Ok(())
}
pub async fn handle_responses(
&self,
_res: ClientResponse<SM::PoolParams, SM::StratumParams>,
) -> Result<()> {
let authorized = self.authorized.lock().await;
if !*authorized {
return Ok(());
}
Ok(())
}
pub async fn next_message(&self) -> Result<ClientPacket<SM::PoolParams, SM::StratumParams>> {
loop {
let mut stream = self.read_half.lock().await;
let mut buf = String::new();
stream.read_line(&mut buf).await?;
if !buf.is_empty() {
buf = buf.trim().to_owned();
let msg: ClientPacket<SM::PoolParams, SM::StratumParams> =
serde_json::from_str(&buf)?;
return Ok(msg);
};
}
}
async fn send<T: Serialize>(&self, message: T) -> Result<()> {
let msg = serde_json::to_vec(&message)?;
let mut stream = self.write_half.lock().await;
stream.write_all(&msg).await?;
stream.write_all(b"\n").await?;
Ok(())
}
async fn send_request(
&self,
method: StratumMethod,
params: PoolParam<SM::PoolParams, SM::StratumParams>,
) -> Result<()> {
let request = PoolRequest {
id: self.id.clone(),
method,
params,
};
Ok(self.send(request).await?)
}
async fn send_response(
&self,
method: StratumMethod,
result: PoolParam<SM::PoolParams, SM::StratumParams>,
) -> Result<()> {
let response = PoolResponse {
id: self.id.clone(),
method,
result,
error: None,
};
Ok(self.send(response).await?)
}
pub async fn shutdown(&self) -> Result<()> {
*self.state.lock().await = State::Disconnect;
Ok(())
}
pub async fn handle_authorize(
&self,
auth: &<SM::PoolParams as PoolParams>::Authorize,
) -> Result<()> {
let miner_info = Arc::new(MinerInfo {
ip: "127.0.0.1".parse().unwrap(),
});
let authorized = self.auth_manager.authorize(miner_info, &auth).await?;
self.send_response(
StratumMethod::Authorize,
PoolParam::AuthorizeResult(authorized),
)
.await?;
*self.authorized.lock().await = true;
info!("Authorized Miner: {}", &self.id);
Ok(())
}
pub async fn send_work(&self) -> Result<()> {
let job = self.data_provider.get_job().await?;
self.send_request(StratumMethod::Notify, PoolParam::Notify(job))
.await?;
Ok(())
}
pub async fn handle_submit(
&self,
share: &<SM::StratumParams as StratumParams>::Submit,
) -> Result<()> {
let miner_info = Arc::new(MinerInfo {
ip: "127.0.0.1".parse().unwrap(),
});
info!("Received share from miner: {}", &self.id);
let valid = self
.block_validator
.validate_share(miner_info, share.clone())
.await?;
if valid {
info!("Invalid share from miner: {} - Sending Reject", &self.id);
} else {
info!("Accepted share from miner: {}", &self.id);
}
Ok(())
}
pub async fn handle_subscribe(
&self,
subscribe: &<SM::PoolParams as PoolParams>::Subscribe,
) -> Result<()> {
let miner_info = Arc::new(MinerInfo {
ip: "127.0.0.1".parse().unwrap(),
});
let sub_info = self.auth_manager.subscribe(miner_info, subscribe).await?;
*self.subscriber_id.lock().await = sub_info.id();
*self.subscribed.lock().await = true;
self.send_response(
StratumMethod::Subscribe,
PoolParam::SubscribeResult(sub_info),
)
.await?;
self.set_difficulty(0.5).await?;
self.send_work().await?;
Ok(())
}
async fn set_difficulty(&self, difficulty: f64) -> Result<()> {
*self.difficulty.lock().await = difficulty;
self.send_request(
StratumMethod::SetDifficulty,
PoolParam::SetDifficulty(difficulty),
)
.await?;
Ok(())
}
pub async fn handle_unknown(&self, _msg: &serde_json::Value) -> Result<()> {
Ok(())
}
}