#![cfg(feature = "default")]
use crate::api::init_api_server;
pub use crate::miner::Connection;
use crate::BanManager;
pub use crate::MinerList;
use async_std::net::TcpListener;
use async_std::sync::Arc;
use async_std::task;
use futures::io::BufReader;
use futures::io::{AsyncBufReadExt, AsyncReadExt};
use futures::StreamExt;
use log::{info, warn};
use serde::Deserialize;
use std::time::Duration;
use stratum_types::traits::{DataProvider, StratumManager};
use stratum_types::Result;
#[derive(Clone)]
pub struct StratumServer<SM: StratumManager> {
config: ServerConfig,
data_provider: Arc<SM::DataProvider>,
auth_manager: Arc<SM::AuthManager>,
block_validator: Arc<SM::BlockValidator>,
connection_list: Arc<MinerList<SM>>,
ban_manager: Arc<BanManager>,
}
#[derive(Clone, Debug, Deserialize)]
pub struct ServerConfig {
pub host: String,
pub port: u16,
pub max_connections: Option<usize>,
pub proxy: bool,
pub var_diff: bool,
pub initial_difficulty: f64,
}
impl<SM> StratumServer<SM>
where
SM: StratumManager + 'static,
{
pub fn new(
config: ServerConfig,
data_provider: Arc<SM::DataProvider>,
auth_manager: Arc<SM::AuthManager>,
block_validator: Arc<SM::BlockValidator>,
) -> Self {
let connection_list = Arc::new(MinerList::new());
StratumServer {
connection_list,
config,
data_provider,
auth_manager,
block_validator,
ban_manager: Arc::new(BanManager::new()),
}
}
async fn init(&self) -> Result<()> {
info!("Initializing...");
self.init_data_provider().await?;
info!("Data Provider Initialized");
if cfg!(feature = "default") {
init_api_server().await?;
info!("API Server Initialized");
}
info!("Initialization Complete");
Ok(())
}
async fn init_data_provider(&self) -> Result<()> {
self.data_provider.init().await?;
let data_provider = self.data_provider.clone();
let connection_list = self.connection_list.clone();
task::spawn(async move {
loop {
task::sleep(Duration::from_secs(2)).await;
if data_provider.poll_template().await.unwrap() {
connection_list.broadcast_new_job().await.unwrap();
}
}
});
Ok(())
}
pub async fn start(&self) -> Result<()> {
self.init().await?;
let listening_host = format!("{}:{}", self.config.host, self.config.port);
let listener = TcpListener::bind(&listening_host).await?;
let mut incoming = listener.incoming();
info!("Listening on {}", listening_host);
while let Some(stream) = incoming.next().await {
let stream = stream?;
let mut addr = stream.peer_addr()?;
let (rh, wh) = stream.split();
let mut buffer_stream = BufReader::new(rh);
let connection_list = self.connection_list.clone();
let data_provider = self.data_provider.clone();
let auth_manager = self.auth_manager.clone();
let block_validator = self.block_validator.clone();
let proxy = self.config.proxy;
let port = self.config.port;
let var_diff = self.config.var_diff;
let initial_difficulty = self.config.initial_difficulty;
let ban_manager = self.ban_manager.clone();
if proxy {
let mut buf = String::new();
buffer_stream.read_line(&mut buf).await.unwrap();
let buf = buf.trim();
let pieces: Vec<&str> = buf.split(' ').collect();
let attempted_port: u16 = pieces[5].parse().unwrap();
if attempted_port != port {
continue;
}
addr = format!("{}:{}", pieces[2], pieces[4]).parse().unwrap();
}
task::spawn(async move {
if !ban_manager.check_banned(&addr).await {
let connection = Arc::new(Connection::new(
addr,
buffer_stream,
wh,
data_provider.clone(),
auth_manager.clone(),
block_validator.clone(),
var_diff,
initial_difficulty,
));
connection_list
.add_miner(addr, connection.clone())
.await
.unwrap();
info!("Accepting stream from: {}", addr);
let _result = connection.start().await;
info!("Closing stream from: {}", addr);
connection_list.remove_miner(addr).await.unwrap();
if connection.needs_ban().await {
ban_manager.add_ban(&addr).await;
}
} else {
warn!(
"Banned connection attempting to connect: {}. Connected closed",
addr
);
}
});
}
Ok(())
}
}