use crate::types::{Event, Message, State};
use async_std::net::TcpStream;
use async_std::sync::{Arc, Mutex};
use async_std::task;
use futures::channel::mpsc::{unbounded, UnboundedReceiver, UnboundedSender};
use futures::io::BufReader;
use futures::io::{AsyncBufReadExt, AsyncReadExt, AsyncWriteExt};
use futures::io::{ReadHalf, WriteHalf};
use futures::sink::SinkExt;
use futures::StreamExt;
use serde::{Deserialize, Serialize};
use std::time::Duration;
use stratum_types::params::{ClientParam, PoolParam};
use stratum_types::traits::{PoolParams, StratumParams, Subscribe, SubscribeResult};
use stratum_types::Error;
use stratum_types::Result;
use stratum_types::{PoolPacket, Request, StratumError, StratumMethod, ID};
#[derive(Clone, Debug, Deserialize)]
pub struct ClientConfig<PP>
where
PP: PoolParams,
{
pub host: String,
pub credentials: PP::Authorize,
pub subscriber_info: PP::Subscribe,
}
#[derive(Debug, Clone)]
pub struct StratumClient<PP, SP>
where
PP: PoolParams,
SP: StratumParams,
{
event_rx: Arc<Mutex<UnboundedReceiver<Event>>>,
event_tx: Arc<Mutex<UnboundedSender<Event>>>,
message_tx: Arc<Mutex<UnboundedSender<Message<SP>>>>,
message_rx: Arc<Mutex<UnboundedReceiver<Message<SP>>>>,
pub write_half: Arc<Mutex<WriteHalf<TcpStream>>>,
pub read_half: Arc<Mutex<BufReader<ReadHalf<TcpStream>>>>,
pub config: Arc<ClientConfig<PP>>,
state: Arc<Mutex<State>>,
current_job: Arc<Mutex<Option<SP::Notify>>>,
new_work: Arc<Mutex<bool>>,
difficulty: Arc<Mutex<f64>>,
subscriber_info: Arc<Mutex<PP::Subscribe>>,
subscribe_id: Arc<Mutex<String>>,
}
impl<PP, SP> StratumClient<PP, SP>
where
PP: PoolParams
+ Serialize
+ for<'de> Deserialize<'de>
+ Sync
+ Send
+ std::fmt::Debug
+ Clone
+ 'static,
SP: StratumParams
+ Serialize
+ for<'de> Deserialize<'de>
+ Sync
+ Send
+ std::fmt::Debug
+ Clone
+ 'static,
{
pub async fn new(config: ClientConfig<PP>, sid: Option<String>) -> Result<Self> {
let stream = TcpStream::connect(&config.host).await?;
let (rh, wh) = stream.split();
let (tx, rx) = unbounded();
let (m_tx, m_rx) = unbounded();
let subscribe_id = sid.unwrap_or("".to_owned());
Ok(StratumClient {
config: Arc::new(config.clone()),
write_half: Arc::new(Mutex::new(wh)),
read_half: Arc::new(Mutex::new(BufReader::new(rh))),
state: Arc::new(Mutex::new(State::Connected)),
event_tx: Arc::new(Mutex::new(tx)),
event_rx: Arc::new(Mutex::new(rx)),
message_rx: Arc::new(Mutex::new(m_rx)),
message_tx: Arc::new(Mutex::new(m_tx)),
current_job: Arc::new(Mutex::new(None)),
new_work: Arc::new(Mutex::new(false)),
difficulty: Arc::new(Mutex::new(0.0)),
subscriber_info: Arc::new(Mutex::new(config.subscriber_info)),
subscribe_id: Arc::new(Mutex::new(subscribe_id)),
})
}
pub async fn init_connection(&self) -> Result<()> {
*self.state.lock().await = State::Connected;
if !self.authorize().await? {
return Ok(());
}
self.subscribe().await?;
Ok(())
}
pub async fn init_server(&self) -> Result<()> {
self.init_connection().await?;
let send_self = self.clone();
let sender_handle = task::spawn(async move {
send_self.send_loop().await;
});
self.receive_loop().await;
self.disconnect().await;
sender_handle.await;
Ok(())
}
pub fn run(&self) -> Result<()> {
async_std::task::block_on(self.start())
}
pub async fn start(&self) -> Result<()> {
let mut retry_count = 0;
let timeout = Duration::from_secs(10);
loop {
if retry_count > 0 {
println!(
"Connection broken, attempting reconnect. Current attempts: {}",
retry_count
);
}
self.init_server().await;
println!("Connection broken, attempting to reconnect");
let result = TcpStream::connect(&self.config.host).await;
if let Ok(stream) = result {
println!("Connection Reestablished. Continuing...");
let (rh, wh) = stream.split();
*self.write_half.lock().await = wh;
*self.read_half.lock().await = BufReader::new(rh);
println!("Initializating the server");
self.init_server().await;
} else {
if retry_count > 12 {
break;
}
async_std::task::sleep(timeout).await;
retry_count += 1;
}
}
Ok(())
}
async fn receive_loop(&self) -> Result<()> {
loop {
if self.check_disconnect().await {
break;
}
let msg = self.next_message().await?;
match msg {
PoolPacket::Request(req) => self.handle_requests(req).await?,
PoolPacket::Response(res) => {
if let Some(result) = res.result {
self.handle_responses(result).await?;
} else if let Some(error) = res.error {
self.handle_errors(error).await?;
} else {
}
}
};
}
Ok(())
}
async fn handle_requests(&self, req: Request<PoolParam<PP, SP>>) -> Result<()> {
match req.params {
PoolParam::SubmitResult(accepted) => self.handle_submit_result(accepted).await?,
PoolParam::Notify(job) => self.handle_notify(job).await?,
PoolParam::SetDifficulty(diff) => self.handle_difficulty(diff).await?,
_ => {
dbg!("Got an invaid msg");
}
};
Ok(())
}
async fn handle_responses(&self, result: PoolParam<PP, SP>) -> Result<()> {
if let PoolParam::AuthorizeResult(auth) = &result {
self.handle_authorize_return(auth).await?;
return Ok(());
}
match result {
PoolParam::SubscribeResult(info) => self.handle_subscribe_result(info).await?,
PoolParam::SubmitResult(_) => println!("Share Accepted!"),
_ => {
println!("Recieved unknown packet");
}
}
Ok(())
}
async fn handle_errors(&self, error: StratumError) -> Result<()> {
println!("Error: {}", error.to_string());
Ok(())
}
async fn send_loop(&self) -> Result<()> {
loop {
if self.check_disconnect().await {
break;
}
while let Some(msg) = self.message_rx.lock().await.next().await {
match msg {
Message::Submit(share) => self.handle_submit_message(share).await?,
}
}
}
Ok(())
}
async fn check_disconnect(&self) -> bool {
*self.state.lock().await == State::Disconnect
}
async fn disconnect(&self) {
*self.state.lock().await = State::Disconnect;
}
pub async fn next_message(&self) -> Result<PoolPacket<PP, SP>> {
loop {
let mut stream = self.read_half.lock().await;
let mut buf = String::new();
let amt = stream.read_line(&mut buf).await?;
if amt == 0 {
return Err(Error::StreamClosed);
}
buf = buf.trim().to_string();
if !buf.is_empty() {
let response: PoolPacket<PP, SP> = serde_json::from_str(&buf)?;
return Ok(response);
};
}
}
async fn send<T: Serialize>(&self, message: T) -> Result<()> {
let msg = serde_json::to_vec(&message)?;
let mut stream = self.write_half.lock().await;
stream.write_all(&msg).await?;
stream.write_all(b"\n").await?;
Ok(())
}
async fn send_request(&self, method: StratumMethod, params: ClientParam<PP, SP>) -> Result<()> {
let msg = Request {
id: ID::Str("".to_owned()),
method,
params,
};
Ok(self.send(msg).await?)
}
pub async fn authorize(&self) -> Result<bool> {
let authorize = self.config.credentials.clone();
self.send_request(StratumMethod::Authorize, ClientParam::Authorize(authorize))
.await?;
let msg = self.next_message().await?;
let mut authorize_success = false;
if let PoolPacket::Response(res) = msg {
if let Some(PoolParam::AuthorizeResult(auth)) = &res.result {
self.handle_authorize_return(auth).await?;
authorize_success = true;
}
}
Ok(authorize_success)
}
pub async fn subscribe(&self) -> Result<()> {
let subscribe = self.subscriber_info.lock().await;
self.send_request(
StratumMethod::Subscribe,
ClientParam::Subscribe(subscribe.clone()),
)
.await?;
Ok(())
}
pub async fn handle_authorize_return(&self, _auth: &PP::AuthorizeResult) -> Result<()> {
Ok(())
}
pub async fn submit(&self, share: SP::Submit) -> Result<()> {
let msg = Message::Submit(share);
self.message_tx.lock().await.send(msg).await?;
Ok(())
}
async fn handle_submit_message(&self, share: SP::Submit) -> Result<()> {
*self.new_work.lock().await = false;
Ok(self
.send_request(StratumMethod::Submit, ClientParam::Submit(share))
.await?)
}
async fn handle_subscribe_result(&self, info: PP::SubscribeResult) -> Result<()> {
*self.subscribe_id.lock().await = info.id();
self.subscriber_info.lock().await.set_sid(&info.id());
Ok(())
}
async fn handle_submit_result(&self, _accepted: bool) -> Result<()> {
Ok(())
}
async fn handle_notify(&self, job: SP::Notify) -> Result<()> {
*self.current_job.lock().await = Some(job);
Ok(())
}
async fn handle_difficulty(&self, diff: f64) -> Result<()> {
println!("Updating difficulty to {}", diff);
*self.difficulty.lock().await = diff;
Ok(())
}
pub async fn is_new_work(&self) -> bool {
*self.new_work.lock().await
}
pub async fn get_difficulty(&self) -> f64 {
*self.difficulty.lock().await
}
pub async fn get_work(&self) -> Option<SP::Notify> {
let work = self.current_job.lock().await;
work.clone()
}
pub async fn wait_for_work(&self) -> Result<()> {
loop {
if self.current_job.lock().await.is_some() {
break;
}
async_std::task::sleep(Duration::from_secs(2)).await;
}
Ok(())
}
pub async fn get_subscriber_id(&self) -> String {
self.subscribe_id.lock().await.clone()
}
}