use crate::{server::VarDiffConfig, Miner, MinerList, Result};
use async_std::sync::{Arc, Mutex};
use extended_primitives::Buffer;
use futures::{
channel::mpsc::{UnboundedReceiver, UnboundedSender},
SinkExt, StreamExt,
};
use log::{debug, trace};
use serde::Serialize;
use serde_json::json;
use std::time::SystemTime;
use stop_token::{StopSource, StopToken};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct UserInfo {
pub account_id: i32,
pub mining_account: i32,
pub worker_name: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ConnectionInfo {
pub agent: bool,
pub authorized: bool,
pub subscribed: bool,
pub client: Option<String>,
pub session_start: SystemTime,
pub state: ConnectionState,
pub is_long_timeout: bool,
}
impl Default for ConnectionInfo {
fn default() -> Self {
Self::new()
}
}
impl ConnectionInfo {
pub fn new() -> Self {
ConnectionInfo {
agent: false,
authorized: false,
subscribed: false,
client: None,
session_start: SystemTime::now(),
state: ConnectionState::Connected,
is_long_timeout: false,
}
}
}
#[derive(PartialEq, Debug, Clone)]
pub enum ConnectionState {
Connected,
Disconnect,
}
#[derive(Debug)]
pub struct Connection<State> {
pub id: Uuid,
pub session_id: u32,
pub info: Arc<Mutex<ConnectionInfo>>,
pub user_info: Arc<Mutex<UserInfo>>,
pub sender: Arc<Mutex<UnboundedSender<String>>>,
pub upstream_sender: Arc<Mutex<UnboundedSender<String>>>,
pub upstream_receiver:
Arc<Mutex<UnboundedReceiver<serde_json::map::Map<String, serde_json::Value>>>>,
pub difficulty: Arc<Mutex<f64>>,
pub next_difficulty: Arc<Mutex<Option<f64>>>,
pub options: Arc<MinerOptions>,
pub needs_ban: Arc<Mutex<bool>>,
pub state: Arc<Mutex<State>>,
pub stop_source: Arc<Mutex<Option<StopSource>>>,
pub stop_token: StopToken,
pub miner_list: MinerList,
}
#[derive(Debug, Default)]
pub struct MinerOptions {
pub retarget_time: i64,
pub target_time: f64,
pub min_diff: f64,
pub max_diff: f64,
pub max_delta: f64,
pub variance_percent: f64,
}
impl<State: Clone + Send + Sync + 'static> Connection<State> {
pub fn new(
session_id: u32,
sender: UnboundedSender<String>,
upstream_sender: UnboundedSender<String>,
upstream_receiver: UnboundedReceiver<serde_json::map::Map<String, serde_json::Value>>,
initial_difficulty: f64,
var_diff_config: VarDiffConfig,
state: State,
) -> Self {
let id = Uuid::new_v4();
debug!("Accepting new miner. ID: {}", &id);
let options = MinerOptions {
retarget_time: var_diff_config.retarget_time,
target_time: var_diff_config.target_time,
min_diff: var_diff_config.minimum_difficulty,
max_diff: var_diff_config.maximum_difficulty,
max_delta: 1.0, variance_percent: var_diff_config.variance_percent,
};
let stop_source = StopSource::new();
let stop_token = stop_source.token();
Connection {
id,
session_id,
user_info: Arc::new(Mutex::new(UserInfo {
account_id: 0,
mining_account: 0,
worker_name: None,
})),
info: Arc::new(Mutex::new(ConnectionInfo::new())),
sender: Arc::new(Mutex::new(sender)),
upstream_sender: Arc::new(Mutex::new(upstream_sender)),
upstream_receiver: Arc::new(Mutex::new(upstream_receiver)),
difficulty: Arc::new(Mutex::new(initial_difficulty)),
next_difficulty: Arc::new(Mutex::new(None)),
options: Arc::new(options),
needs_ban: Arc::new(Mutex::new(false)),
state: Arc::new(Mutex::new(state)),
stop_source: Arc::new(Mutex::new(Some(stop_source))),
stop_token,
miner_list: MinerList::new(),
}
}
pub async fn is_disconnected(&self) -> bool {
self.info.lock().await.state == ConnectionState::Disconnect
}
pub async fn send<T: Serialize>(&self, message: T) -> Result<()> {
let msg_string = serde_json::to_string(&message)?;
trace!("Sending message: {}", msg_string.clone());
let mut sender = self.sender.lock().await;
sender.send(msg_string).await?;
Ok(())
}
pub async fn upstream_send<T: Serialize>(&self, message: T) -> Result<()> {
let msg_string = serde_json::to_string(&message)?;
debug!("Sending message: {}", msg_string.clone());
let mut upstream_sender = self.upstream_sender.lock().await;
upstream_sender.send(msg_string).await?;
Ok(())
}
pub async fn upstream_result(&self) -> Result<(serde_json::Value, serde_json::Value)> {
let mut upstream_receiver = self.upstream_receiver.lock().await;
let values = match upstream_receiver.next().await {
Some(values) => values,
None => return Ok((json!(false), serde_json::Value::Null)),
};
Ok((values["result"].clone(), values["error"].clone()))
}
pub async fn shutdown(&self) {
self.info.lock().await.state = ConnectionState::Disconnect;
*self.stop_source.lock().await = None;
}
pub async fn disconnect(&self) {
self.info.lock().await.state = ConnectionState::Disconnect;
}
pub async fn ban(&self) {
*self.needs_ban.lock().await = true;
self.disconnect().await;
}
pub async fn needs_ban(&self) -> bool {
*self.needs_ban.lock().await
}
pub fn id(&self) -> Uuid {
self.id
}
pub async fn register_worker(
&self,
session_id: u16,
client_agent: &str,
worker_name: &str,
worker_id: Uuid,
) {
let worker = Miner::new(
worker_id,
Some(client_agent.to_owned()),
Some(worker_name.to_owned()),
Buffer::from(session_id.to_le_bytes().to_vec()),
self.options.clone(),
);
self.miner_list.add_miner(session_id, worker).await;
}
pub async fn unregister_worker(&self, session_id: u16) {
self.miner_list.remove_miner(session_id).await;
}
pub async fn get_worker_list(&self) -> MinerList {
self.miner_list.clone()
}
pub async fn get_worker_by_session_id(&self, session_id: u16) -> Option<Miner> {
self.miner_list.get_miner_by_id(session_id).await
}
pub async fn update_worker_by_session_id(&self, session_id: u16, miner: Miner) {
self.miner_list
.update_miner_by_session_id(session_id, miner)
.await;
}
pub async fn set_user_info(
&self,
account_id: i32,
mining_account_id: i32,
worker_name: Option<String>,
) {
let mut user_info = self.user_info.lock().await;
user_info.account_id = account_id;
user_info.mining_account = mining_account_id;
user_info.worker_name = worker_name;
}
pub async fn get_user_info(&self) -> UserInfo {
self.user_info.lock().await.clone()
}
pub async fn set_client(&self, client: &str) {
let mut agent = false;
let mut long_timeout = false;
if client.starts_with("btccom-agent/") {
agent = true;
long_timeout = true;
}
let mut info = self.info.lock().await;
info.agent = agent;
info.client = Some(client.to_string());
info.is_long_timeout = long_timeout;
}
pub async fn get_connection_info(&self) -> ConnectionInfo {
self.info.lock().await.clone()
}
pub fn get_session_id(&self) -> u32 {
self.session_id
}
pub async fn authorized(&self) -> bool {
self.info.lock().await.authorized
}
pub async fn authorize(&self) {
self.info.lock().await.authorized = true;
}
pub async fn subscribed(&self) -> bool {
self.info.lock().await.subscribed
}
pub async fn subscribe(&self) {
self.info.lock().await.subscribed = true;
}
pub async fn set_difficulty(&self, difficulty: f64) {
*self.difficulty.lock().await = difficulty;
}
pub async fn get_difficulty(&self) -> f64 {
*self.difficulty.lock().await
}
pub async fn get_state(&self) -> State {
self.state.lock().await.clone()
}
pub async fn set_state(&self, state: State) {
*self.state.lock().await = state;
}
pub async fn update_difficulty(&self) -> Option<f64> {
let next_difficulty = *self.next_difficulty.lock().await;
if let Some(next_difficulty) = next_difficulty {
*self.difficulty.lock().await = next_difficulty;
*self.next_difficulty.lock().await = None;
Some(next_difficulty)
} else {
None
}
}
pub fn get_stop_token(&self) -> StopToken {
self.stop_token.clone()
}
}