1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
pub use crate::miner::Connection;
// pub use crate::responses::*;
// use crate::types::StratumImpl;
use async_std::net::TcpListener;
use async_std::sync::{Arc, Mutex};
use async_std::task;
use futures::StreamExt;
// use lru_time_cache::*;
use serde::Deserialize;
use stratum_types::traits::StratumManager;
use stratum_types::Result;

//@todo turn this into the "server-builder syntax"
//on new
pub struct StratumServer<SM: StratumManager> {
    config: ServerConfig,
    manager: Arc<SM>,
    // miner_connections: Mutex<LruCache<String, Arc<Miner>>>,
    //@todo this ban method is not the best, fix this.
    // miner_bans: Mutex<LruCache<IpAddr, bool>>,
    // @todo make this an LRU of time and capacity?.
    // @todo make this a struct, so we can impl functions on it like broadcast.
    connection_list: Arc<Mutex<Vec<Arc<Connection<SM>>>>>,
    // job_provider: Arc<JobProvider>, @todo make this a part of stratum manager
}

//@todo put into builder
#[derive(Clone, Debug, Deserialize)]
pub struct ServerConfig {
    pub host: String,
    pub port: u16,
    pub max_connections: Option<usize>,
}

impl<SM> StratumServer<SM>
where
    SM: StratumManager + 'static,
{
    pub fn new(config: ServerConfig, manager: Arc<SM>) -> Self {
        let connection_list = Arc::new(Mutex::new(Vec::new()));

        StratumServer {
            connection_list,
            config,
            manager,
        }
    }

    pub async fn start(&self) -> Result<()> {
        let listening_host = format!("{}:{}", self.config.host, self.config.port);

        let listener = TcpListener::bind(&listening_host).await?;
        let mut incoming = listener.incoming();

        // info!("Listening on {}", listening_host);
        println!("Listening on {}", listening_host);

        while let Some(stream) = incoming.next().await {
            let stream = stream?;
            let addr = stream.peer_addr()?;

            let connection = Arc::new(Connection::new(stream, self.manager.clone()));

            let mut connection_list = self.connection_list.lock().await;

            connection_list.push(connection.clone());

            task::spawn(async move {
                // info!("Accepting stream from: {}", addr);
                println!("Accepting stream from: {}", addr);

                let new_connection = connection.clone();

                //TODO remove unwrap here
                new_connection.start().await.unwrap();

                //TODO here remove new connection from the list of connections, which means we
                //probably need to pass this a mutex of connections
                //@todo don't always kill drop a connection from the list.
                //We should keep a cache of these connections, as if someone is a dropped
                //connection, they may want to reconnect w/ a similar ID.
                //That also being said, feels like we should index these connections via ID and not
                //IP. @todo.

                println!("Closing stream from: {}", addr);
            });

            // .unwrap();
        }
        Ok(())
    }
}

//impl<T> StratumServer<T>
//where
//    T: StratumImpl,
//{
//    pub fn new(
//        // app: Arc<App>,
//        server_config: &ServerConfig,
//        stratum: T,
//        // job_provider: Arc<JobProvider>
//    ) -> StratumServer<T> {
//        // let time_to_live = Duration::from_secs(60 * 60 * 2);
//        // We only issue short bans - these are just to keep people from being able to cheaply overload
//        // the server by falsely submitting low-difficulty shares.
//        let ban_length = Duration::from_secs(60 * 5);
//        StratumServer {
//            config: server_config.clone(),
//            // server: listener,
//            // app,
//            // miner_connections: Mutex::new(LruCache::with_expiry_duration_and_capacity(
//            //     time_to_live,
//            //     server_config.max_connections.unwrap_or(10000),
//            // )),
//            miner_bans: Mutex::new(LruCache::with_expiry_duration(ban_length)),
//            stratum,
//            // job_provider,
//            // nonce_pattern: Regex::new("[0-9a-f]{8}").unwrap(),
//        }
//    }