use crate::{config::VarDiffConfig, format_difficulty, Miner, MinerList, Result};
use async_std::sync::{Arc, Mutex, RwLock};
use extended_primitives::Buffer;
use futures::{
channel::mpsc::{UnboundedReceiver, UnboundedSender},
SinkExt, StreamExt,
};
use log::{debug, trace};
use serde::Serialize;
use serde_json::json;
use std::time::{Duration, SystemTime};
use stop_token::{StopSource, StopToken};
use uuid::Uuid;
#[derive(Debug, Clone)]
pub struct UserInfo {
pub account_id: i32,
pub mining_account: i32,
pub worker_name: Option<String>,
}
#[derive(Debug, Clone)]
pub struct ConnectionInfo {
pub agent: bool,
pub authorized: bool,
pub subscribed: bool,
pub client: Option<String>,
pub session_start: SystemTime,
pub state: ConnectionState,
pub is_long_timeout: bool,
}
impl Default for ConnectionInfo {
fn default() -> Self {
Self::new()
}
}
impl ConnectionInfo {
pub fn new() -> Self {
ConnectionInfo {
agent: false,
authorized: false,
subscribed: false,
client: None,
session_start: SystemTime::now(),
state: ConnectionState::Connected,
is_long_timeout: false,
}
}
}
#[derive(PartialEq, Eq, Debug, Clone)]
pub enum ConnectionState {
Connected,
Disconnect,
}
#[derive(Debug)]
pub enum SendInformation {
Json(String),
Text(String),
Raw(Buffer),
}
#[derive(Debug)]
pub struct Connection<State> {
pub id: Uuid,
pub session_id: u32,
pub info: Arc<RwLock<ConnectionInfo>>,
pub user_info: Arc<Mutex<UserInfo>>,
pub sender: Arc<Mutex<UnboundedSender<SendInformation>>>,
pub upstream_sender: Arc<Mutex<UnboundedSender<String>>>,
pub upstream_receiver:
Arc<Mutex<UnboundedReceiver<serde_json::map::Map<String, serde_json::Value>>>>,
pub difficulty: Arc<Mutex<u64>>,
pub previous_difficulty: Arc<Mutex<u64>>,
pub next_difficulty: Arc<Mutex<Option<u64>>>,
pub options: Arc<MinerOptions>,
pub needs_ban: Arc<Mutex<bool>>,
pub state: Arc<Mutex<State>>,
pub stop_source: Arc<Mutex<Option<StopSource>>>,
pub stop_token: StopToken,
pub connection_miner: Arc<Mutex<Option<Miner>>>,
pub miner_list: MinerList,
}
#[derive(Debug, Default)]
pub struct MinerOptions {
pub retarget_time: u64, pub target_time: u64, pub min_diff: u64,
pub max_diff: u64,
pub max_delta: f64,
pub variance_percent: f64,
}
impl<State: Clone + Send + Sync + 'static> Connection<State> {
pub fn new(
session_id: u32,
sender: UnboundedSender<SendInformation>,
upstream_sender: UnboundedSender<String>,
upstream_receiver: UnboundedReceiver<serde_json::map::Map<String, serde_json::Value>>,
initial_difficulty: u64,
var_diff_config: VarDiffConfig,
state: State,
) -> Self {
let id = Uuid::new_v4();
debug!("Accepting new miner. ID: {}", &id);
let options = MinerOptions {
retarget_time: var_diff_config.retarget_time,
target_time: var_diff_config.target_time,
min_diff: var_diff_config.minimum_difficulty,
max_diff: var_diff_config.maximum_difficulty,
max_delta: 1.0, variance_percent: var_diff_config.variance_percent,
};
let stop_source = StopSource::new();
let stop_token = stop_source.token();
Connection {
id,
session_id,
user_info: Arc::new(Mutex::new(UserInfo {
account_id: 0,
mining_account: 0,
worker_name: None,
})),
info: Arc::new(RwLock::new(ConnectionInfo::new())),
sender: Arc::new(Mutex::new(sender)),
upstream_sender: Arc::new(Mutex::new(upstream_sender)),
upstream_receiver: Arc::new(Mutex::new(upstream_receiver)),
difficulty: Arc::new(Mutex::new(initial_difficulty)),
previous_difficulty: Arc::new(Mutex::new(initial_difficulty)),
next_difficulty: Arc::new(Mutex::new(None)),
options: Arc::new(options),
needs_ban: Arc::new(Mutex::new(false)),
state: Arc::new(Mutex::new(state)),
stop_source: Arc::new(Mutex::new(Some(stop_source))),
stop_token,
miner_list: MinerList::new(),
connection_miner: Arc::new(Mutex::new(None)),
}
}
pub async fn is_disconnected(&self) -> bool {
self.info.read().await.state == ConnectionState::Disconnect
}
pub async fn send<T: Serialize>(&self, message: T) -> Result<()> {
let msg_string = serde_json::to_string(&message)?;
trace!("Sending message: {}", msg_string.clone());
let mut sender = self.sender.lock().await;
sender.send(SendInformation::Json(msg_string)).await?;
Ok(())
}
pub async fn send_text(&self, message: String) -> Result<()> {
let mut sender = self.sender.lock().await;
sender.send(SendInformation::Text(message)).await?;
Ok(())
}
pub async fn send_raw(&self, message: Buffer) -> Result<()> {
let mut sender = self.sender.lock().await;
sender.send(SendInformation::Raw(message)).await?;
Ok(())
}
pub async fn upstream_send<T: Serialize>(&self, message: T) -> Result<()> {
let msg_string = serde_json::to_string(&message)?;
debug!("Sending message: {}", msg_string.clone());
let mut upstream_sender = self.upstream_sender.lock().await;
upstream_sender.send(msg_string).await?;
Ok(())
}
pub async fn upstream_result(&self) -> Result<(serde_json::Value, serde_json::Value)> {
let mut upstream_receiver = self.upstream_receiver.lock().await;
let values = match upstream_receiver.next().await {
Some(values) => values,
None => return Ok((json!(false), serde_json::Value::Null)),
};
Ok((values["result"].clone(), values["error"].clone()))
}
pub async fn shutdown(&self) {
self.info.write().await.state = ConnectionState::Disconnect;
*self.stop_source.lock().await = None;
}
pub async fn disconnect(&self) {
self.info.write().await.state = ConnectionState::Disconnect;
}
pub async fn ban(&self) {
*self.needs_ban.lock().await = true;
self.disconnect().await;
}
pub async fn needs_ban(&self) -> bool {
*self.needs_ban.lock().await
}
pub fn id(&self) -> Uuid {
self.id
}
pub async fn add_main_worker(&self, worker_id: Uuid) {
let conn_info = self.get_connection_info().await;
let user_info = self.get_user_info().await;
let session_id = self.session_id;
let worker = Miner::new(
worker_id,
conn_info.client.to_owned(),
user_info.worker_name.to_owned(),
Buffer::from(session_id.to_le_bytes().to_vec()),
self.options.clone(),
format_difficulty(*self.difficulty.lock().await),
);
*self.connection_miner.lock().await = Some(worker);
}
pub async fn get_main_worker(&self) -> Option<Miner> {
self.connection_miner.lock().await.clone()
}
pub async fn register_worker(
&self,
session_id: u32,
client_agent: &str,
worker_name: &str,
worker_id: Uuid,
) {
let worker = Miner::new(
worker_id,
Some(client_agent.to_owned()),
Some(worker_name.to_owned()),
Buffer::from(session_id.to_le_bytes().to_vec()),
self.options.clone(),
format_difficulty(*self.difficulty.lock().await),
);
self.miner_list.add_miner(session_id, worker).await;
}
pub async fn unregister_worker(&self, session_id: u32) -> Option<Miner> {
self.miner_list.remove_miner(session_id).await
}
pub async fn get_worker_list(&self) -> MinerList {
self.miner_list.clone()
}
pub async fn get_worker_by_session_id(&self, session_id: u32) -> Option<Miner> {
self.miner_list.get_miner_by_id(session_id).await
}
pub async fn update_worker_by_session_id(&self, session_id: u32, miner: Miner) {
self.miner_list
.update_miner_by_session_id(session_id, miner)
.await;
}
pub async fn set_user_info(
&self,
account_id: i32,
mining_account_id: i32,
worker_name: Option<String>,
) {
let mut user_info = self.user_info.lock().await;
user_info.account_id = account_id;
user_info.mining_account = mining_account_id;
user_info.worker_name = worker_name;
}
pub async fn get_user_info(&self) -> UserInfo {
self.user_info.lock().await.clone()
}
pub async fn set_client(&self, client: &str) {
let mut agent = false;
let mut long_timeout = false;
if client.starts_with("btccom-agent/") {
agent = true;
long_timeout = true;
}
let mut info = self.info.write().await;
info.agent = agent;
info.client = Some(client.to_string());
info.is_long_timeout = long_timeout;
}
pub async fn get_connection_info(&self) -> ConnectionInfo {
self.info.read().await.clone()
}
pub async fn is_long_timeout(&self) -> bool {
self.info.read().await.is_long_timeout
}
pub async fn timeout(&self) -> Duration {
let info = self.info.read().await;
if info.is_long_timeout {
Duration::from_secs(86400 * 7)
} else if info.subscribed && info.authorized {
Duration::from_secs(600)
} else {
Duration::from_secs(15)
}
}
pub fn get_session_id(&self) -> u32 {
self.session_id
}
pub async fn authorized(&self) -> bool {
self.info.read().await.authorized
}
pub async fn authorize(&self) {
self.info.write().await.authorized = true;
}
pub async fn subscribed(&self) -> bool {
self.info.read().await.subscribed
}
pub async fn subscribe(&self) {
self.info.write().await.subscribed = true;
}
pub async fn is_agent(&self) -> bool {
self.info.read().await.agent
}
pub async fn set_difficulty(&self, difficulty: u64) {
let miner = self.connection_miner.lock().await.clone();
if let Some(connection_miner) = miner {
connection_miner
.set_difficulty(format_difficulty(difficulty))
.await;
} else {
*self.difficulty.lock().await = format_difficulty(difficulty);
}
}
pub async fn get_difficulty(&self) -> u64 {
let miner = self.connection_miner.lock().await.clone();
if let Some(connection_miner) = miner {
connection_miner.current_difficulty().await
} else {
*self.difficulty.lock().await
}
}
pub async fn get_previous_difficulty(&self) -> u64 {
let miner = self.connection_miner.lock().await.clone();
if let Some(connection_miner) = miner {
connection_miner.previous_difficulty().await
} else {
*self.previous_difficulty.lock().await
}
}
pub async fn get_state(&self) -> State {
self.state.lock().await.clone()
}
pub async fn set_state(&self, state: State) {
*self.state.lock().await = state;
}
pub async fn update_difficulty(&self) -> Option<u64> {
let miner = self.connection_miner.lock().await.clone();
if let Some(connection_miner) = miner {
connection_miner.update_difficulty().await
} else {
None
}
}
pub fn get_stop_token(&self) -> StopToken {
self.stop_token.clone()
}
}