stratum-server 0.9.28

The server code for the Rust Stratum (v1) implementation
Documentation
#![cfg(feature = "default")]
use crate::api::init_api_server;

pub use crate::miner::Connection;
use crate::BanManager;
pub use crate::MinerList;
use async_std::net::TcpListener;
use async_std::sync::Arc;
use async_std::task;
use futures::io::BufReader;
use futures::io::{AsyncBufReadExt, AsyncReadExt};
use futures::StreamExt;
use log::{info, warn};
use serde::Deserialize;
use std::time::Duration;
use stratum_types::traits::{DataProvider, StratumManager};
use stratum_types::Result;

//@todo turn this into the "server-builder syntax"
//on new
#[derive(Clone)]
pub struct StratumServer<SM: StratumManager> {
    config: ServerConfig,
    data_provider: Arc<SM::DataProvider>,
    auth_manager: Arc<SM::AuthManager>,
    block_validator: Arc<SM::BlockValidator>,
    connection_list: Arc<MinerList<SM>>,
    ban_manager: Arc<BanManager>,
}

//@todo put into builder
#[derive(Clone, Debug, Deserialize)]
pub struct ServerConfig {
    pub host: String,
    pub port: u16,
    pub max_connections: Option<usize>,
    pub proxy: bool,
    pub var_diff: bool,
    pub initial_difficulty: f64,
}

impl<SM> StratumServer<SM>
where
    SM: StratumManager + 'static,
{
    pub fn new(
        config: ServerConfig,
        data_provider: Arc<SM::DataProvider>,
        auth_manager: Arc<SM::AuthManager>,
        block_validator: Arc<SM::BlockValidator>,
    ) -> Self {
        let connection_list = Arc::new(MinerList::new());

        StratumServer {
            connection_list,
            config,
            data_provider,
            auth_manager,
            block_validator,
            ban_manager: Arc::new(BanManager::new()),
        }
    }

    //Initialize the server before we want to start accepting any connections.
    async fn init(&self) -> Result<()> {
        info!("Initializing...");
        self.init_data_provider().await?;
        info!("Data Provider Initialized");

        if cfg!(feature = "default") {
            init_api_server().await?;
            info!("API Server Initialized");
        }

        info!("Initialization Complete");
        Ok(())
    }

    async fn init_data_provider(&self) -> Result<()> {
        self.data_provider.init().await?;

        let data_provider = self.data_provider.clone();

        let connection_list = self.connection_list.clone();

        //Spawn the data provider polling.
        task::spawn(async move {
            //@todo could (should?) use inerval here instead. but it's still unstable.
            loop {
                // let mut interval = Interval::new(Duration::from_secs(2));
                task::sleep(Duration::from_secs(2)).await;
                if data_provider.poll_template().await.unwrap() {
                    connection_list.broadcast_new_job().await.unwrap();
                }
            }
        });

        Ok(())
    }

    pub async fn start(&self) -> Result<()> {
        self.init().await?;

        let listening_host = format!("{}:{}", self.config.host, self.config.port);

        let listener = TcpListener::bind(&listening_host).await?;
        let mut incoming = listener.incoming();

        info!("Listening on {}", listening_host);

        while let Some(stream) = incoming.next().await {
            let stream = stream?;
            let mut addr = stream.peer_addr()?;
            let (rh, wh) = stream.split();
            let mut buffer_stream = BufReader::new(rh);
            //Wrap this in an if proxy
            // let addr = stream.peer_addr()?;
            let connection_list = self.connection_list.clone();
            let data_provider = self.data_provider.clone();
            let auth_manager = self.auth_manager.clone();
            let block_validator = self.block_validator.clone();
            let proxy = self.config.proxy;
            let port = self.config.port;
            let var_diff = self.config.var_diff;
            let initial_difficulty = self.config.initial_difficulty;
            let ban_manager = self.ban_manager.clone();

            //@todo wrap this in a function.
            if proxy {
                let mut buf = String::new();
                //@todo handle unwrap here.
                buffer_stream.read_line(&mut buf).await.unwrap();

                //Buf will be of the format "PROXY TCP4 92.118.161.17 172.20.42.228 55867 8080\r\n"
                //Trim the \r\n off
                let buf = buf.trim();
                //Might want to not be ascii whitespace and just normal here.
                // let pieces = buf.split_ascii_whitespace();

                let pieces: Vec<&str> = buf.split(' ').collect();

                let attempted_port: u16 = pieces[5].parse().unwrap();

                //Check that they were trying to connect to us.
                if attempted_port != port {
                    continue;
                }

                addr = format!("{}:{}", pieces[2], pieces[4]).parse().unwrap();
            }

            //If Proxy === true, then we don't get this information from the stream....
            //We need to read a newline, and then parse the proxy info from that. AND Then
            //create a new miner from that information.
            task::spawn(async move {
                if !ban_manager.check_banned(&addr).await {
                    let connection = Arc::new(Connection::new(
                        addr,
                        buffer_stream,
                        wh,
                        data_provider.clone(),
                        auth_manager.clone(),
                        block_validator.clone(),
                        var_diff,
                        initial_difficulty,
                    ));

                    connection_list
                        .add_miner(addr, connection.clone())
                        .await
                        .unwrap();

                    info!("Accepting stream from: {}", addr);

                    //Unused for now, but may be useful for logging or bans
                    let _result = connection.start().await;

                    info!("Closing stream from: {}", addr);

                    connection_list.remove_miner(addr).await.unwrap();

                    if connection.needs_ban().await {
                        ban_manager.add_ban(&addr).await;
                    }
                } else {
                    warn!(
                        "Banned connection attempting to connect: {}. Connected closed",
                        addr
                    );
                }
            });
        }
        Ok(())
    }
}