use super::super::downstream::DownstreamMiningNode as Downstream;
use super::super::{
error::{
Error::{CodecNoise, PoisonLock, UpstreamIncoming},
ProxyResult,
},
status,
upstream_sv2::{EitherFrame, Message, StdFrame},
PoolChangerTrigger,
};
use async_channel::{Receiver, Sender};
use binary_sv2::{Seq0255, U256};
use codec_sv2::{Frame, HandshakeRole, Initiator};
use error_handling::handle_result;
use key_utils::Secp256k1PublicKey;
use network_helpers_sv2::noise_connection_tokio::Connection;
use roles_logic_sv2::{
channel_logic::channel_factory::PoolChannelFactory,
common_messages_sv2::{Protocol, SetupConnection},
common_properties::{IsMiningUpstream, IsUpstream},
handlers::{
common::{ParseUpstreamCommonMessages, SendTo as SendToCommon},
mining::{ParseUpstreamMiningMessages, SendTo},
},
job_declaration_sv2::DeclareMiningJob,
mining_sv2::{ExtendedExtranonce, Extranonce, SetCustomMiningJob},
parsers::{Mining, MiningDeviceMessages, PoolMessages},
routing_logic::{CommonRoutingLogic, MiningRoutingLogic, NoRouting},
selectors::NullDownstreamMiningSelector,
utils::{Id, Mutex},
Error as RolesLogicError,
};
use std::{collections::HashMap, net::SocketAddr, sync::Arc, thread::sleep, time::Duration};
use tokio::{net::TcpStream, task, task::AbortHandle};
use tracing::{error, info, warn};
use std::collections::VecDeque;
#[derive(Debug)]
struct CircularBuffer {
buffer: VecDeque<(u64, u32)>,
capacity: usize,
}
impl CircularBuffer {
fn new(capacity: usize) -> Self {
CircularBuffer {
buffer: VecDeque::with_capacity(capacity),
capacity,
}
}
fn insert(&mut self, key: u64, value: u32) {
if self.buffer.len() == self.capacity {
self.buffer.pop_front();
}
self.buffer.push_back((key, value));
}
fn get(&self, id: u64) -> Option<u32> {
self.buffer
.iter()
.find_map(|&(key, value)| if key == id { Some(value) } else { None })
}
}
impl std::default::Default for CircularBuffer {
fn default() -> Self {
Self::new(10)
}
}
#[derive(Debug, Default)]
struct TemplateToJobId {
template_id_to_job_id: CircularBuffer,
request_id_to_template_id: HashMap<u32, u64>,
}
impl TemplateToJobId {
fn register_template_id(&mut self, template_id: u64, request_id: u32) {
self.request_id_to_template_id
.insert(request_id, template_id);
}
fn register_job_id(&mut self, template_id: u64, job_id: u32) {
self.template_id_to_job_id.insert(template_id, job_id);
}
fn get_job_id(&mut self, template_id: u64) -> Option<u32> {
self.template_id_to_job_id.get(template_id)
}
fn take_template_id(&mut self, request_id: u32) -> Option<u64> {
self.request_id_to_template_id.remove(&request_id)
}
fn new() -> Self {
Self::default()
}
}
#[derive(Debug)]
pub struct Upstream {
channel_id: Option<u32>,
tx_status: status::Sender,
pub min_extranonce_size: u16,
pub upstream_extranonce1_size: usize,
pub pool_signature: String,
pub receiver: Receiver<EitherFrame>,
pub sender: Sender<EitherFrame>,
pub downstream: Option<Arc<Mutex<Downstream>>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
pool_chaneger_trigger: Arc<Mutex<PoolChangerTrigger>>,
channel_factory: Option<PoolChannelFactory>,
template_to_job_id: TemplateToJobId,
req_ids: Id,
}
impl Upstream {
pub async fn send(self_: &Arc<Mutex<Self>>, sv2_frame: StdFrame) -> ProxyResult<'static, ()> {
let sender = self_
.safe_lock(|s| s.sender.clone())
.map_err(|_| PoisonLock)?;
let either_frame = sv2_frame.into();
sender.send(either_frame).await.map_err(|e| {
super::super::error::Error::ChannelErrorSender(
super::super::error::ChannelSendError::General(e.to_string()),
)
})?;
Ok(())
}
#[cfg_attr(feature = "cargo-clippy", allow(clippy::too_many_arguments))]
pub async fn new(
address: SocketAddr,
authority_public_key: Secp256k1PublicKey,
min_extranonce_size: u16,
pool_signature: String,
tx_status: status::Sender,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
pool_chaneger_trigger: Arc<Mutex<PoolChangerTrigger>>,
) -> ProxyResult<'static, Arc<Mutex<Self>>> {
let socket = loop {
match TcpStream::connect(address).await {
Ok(socket) => break socket,
Err(e) => {
error!(
"Failed to connect to Upstream role at {}, retrying in 5s: {}",
address, e
);
sleep(Duration::from_secs(5));
}
}
};
let pub_key: Secp256k1PublicKey = authority_public_key;
let initiator = Initiator::from_raw_k(pub_key.into_bytes())?;
info!(
"PROXY SERVER - ACCEPTING FROM UPSTREAM: {}",
socket.peer_addr()?
);
let (receiver, sender, _, _) = Connection::new(socket, HandshakeRole::Initiator(initiator))
.await
.expect("Failed to create connection");
Ok(Arc::new(Mutex::new(Self {
channel_id: None,
min_extranonce_size,
upstream_extranonce1_size: 16, pool_signature,
tx_status,
receiver,
sender,
downstream: None,
task_collector,
pool_chaneger_trigger,
channel_factory: None,
template_to_job_id: TemplateToJobId::new(),
req_ids: Id::new(),
})))
}
pub async fn setup_connection(
self_: Arc<Mutex<Self>>,
min_version: u16,
max_version: u16,
) -> ProxyResult<'static, ()> {
let setup_connection = Self::get_setup_connection_message(min_version, max_version, true)?;
let sv2_frame: StdFrame = Message::Common(setup_connection.into()).try_into()?;
Self::send(&self_, sv2_frame).await?;
let recv = self_
.safe_lock(|s| s.receiver.clone())
.map_err(|_| PoisonLock)?;
let mut incoming: StdFrame = match recv.recv().await {
Ok(frame) => frame.try_into()?,
Err(e) => {
error!("Upstream connection closed: {}", e);
return Err(CodecNoise(
codec_sv2::noise_sv2::Error::ExpectedIncomingHandshakeMessage,
));
}
};
let message_type = if let Some(header) = incoming.get_header() {
header.msg_type()
} else {
return Err(framing_sv2::Error::ExpectedHandshakeFrame.into());
};
let payload = incoming.payload();
ParseUpstreamCommonMessages::handle_message_common(
self_.clone(),
message_type,
payload,
CommonRoutingLogic::None,
)?;
Ok(())
}
#[allow(clippy::too_many_arguments)]
pub async fn set_custom_jobs(
self_: &Arc<Mutex<Self>>,
declare_mining_job: DeclareMiningJob<'static>,
set_new_prev_hash: roles_logic_sv2::template_distribution_sv2::SetNewPrevHash<'static>,
merkle_path: Seq0255<'static, U256<'static>>,
signed_token: binary_sv2::B0255<'static>,
coinbase_tx_version: u32,
coinbase_prefix: binary_sv2::B0255<'static>,
coinbase_tx_input_n_sequence: u32,
coinbase_tx_value_remaining: u64,
coinbase_tx_outs: Vec<u8>,
coinbase_tx_locktime: u32,
template_id: u64,
) -> ProxyResult<'static, ()> {
info!("Sending set custom mining job");
let request_id = self_.safe_lock(|s| s.req_ids.next()).unwrap();
let channel_id = loop {
if let Some(id) = self_.safe_lock(|s| s.channel_id).unwrap() {
break id;
};
tokio::task::yield_now().await;
};
let updated_timestamp = std::time::SystemTime::now()
.duration_since(std::time::UNIX_EPOCH)
.unwrap()
.as_secs() as u32;
let to_send = SetCustomMiningJob {
channel_id,
request_id,
token: signed_token,
version: declare_mining_job.version,
prev_hash: set_new_prev_hash.prev_hash,
min_ntime: updated_timestamp,
nbits: set_new_prev_hash.n_bits,
coinbase_tx_version,
coinbase_prefix,
coinbase_tx_input_n_sequence,
coinbase_tx_value_remaining,
coinbase_tx_outputs: coinbase_tx_outs.try_into().unwrap(),
coinbase_tx_locktime,
merkle_path,
extranonce_size: 0,
};
let message = PoolMessages::Mining(Mining::SetCustomMiningJob(to_send));
let frame: StdFrame = message.try_into().unwrap();
self_
.safe_lock(|s| {
s.template_to_job_id
.register_template_id(template_id, request_id)
})
.unwrap();
Self::send(self_, frame).await
}
#[allow(clippy::result_large_err)]
pub fn parse_incoming(self_: Arc<Mutex<Self>>) -> ProxyResult<'static, ()> {
let (recv, tx_status) = self_
.safe_lock(|s| (s.receiver.clone(), s.tx_status.clone()))
.map_err(|_| PoisonLock)?;
let main_task = {
let self_ = self_.clone();
task::spawn(async move {
loop {
let incoming = handle_result!(tx_status, recv.recv().await);
let mut incoming: StdFrame = handle_result!(tx_status, incoming.try_into());
let message_type =
incoming
.get_header()
.ok_or(super::super::error::Error::FramingSv2(
framing_sv2::Error::ExpectedSv2Frame,
));
let message_type = handle_result!(tx_status, message_type).msg_type();
let payload = incoming.payload();
let routing_logic = MiningRoutingLogic::None;
let next_message_to_send = Upstream::handle_message_mining(
self_.clone(),
message_type,
payload,
routing_logic,
);
match next_message_to_send {
Ok(SendTo::RelaySameMessageToRemote(downstream_mutex)) => {
let sv2_frame: codec_sv2::Sv2Frame<
MiningDeviceMessages,
buffer_sv2::Slice,
> = incoming.map(|payload| payload.try_into().unwrap());
Downstream::send(&downstream_mutex, sv2_frame)
.await
.unwrap();
}
Ok(SendTo::None(_)) => (),
Ok(_) => unreachable!(),
Err(e) => {
let status = status::Status {
state: status::State::UpstreamShutdown(UpstreamIncoming(e)),
};
error!(
"TERMINATING: Error handling pool role message: {:?}",
status
);
if let Err(e) = tx_status.send(status).await {
error!("Status channel down: {:?}", e);
}
break;
}
}
}
})
};
self_
.safe_lock(|s| {
s.task_collector
.safe_lock(|c| c.push(main_task.abort_handle()))
.unwrap()
})
.unwrap();
Ok(())
}
#[allow(clippy::result_large_err)]
fn get_setup_connection_message(
min_version: u16,
max_version: u16,
is_work_selection_enabled: bool,
) -> ProxyResult<'static, SetupConnection<'static>> {
let endpoint_host = "0.0.0.0".to_string().into_bytes().try_into()?;
let vendor = String::new().try_into()?;
let hardware_version = String::new().try_into()?;
let firmware = String::new().try_into()?;
let device_id = String::new().try_into()?;
let flags = match is_work_selection_enabled {
false => 0b0000_0000_0000_0000_0000_0000_0000_0100,
true => 0b0000_0000_0000_0000_0000_0000_0000_0110,
};
Ok(SetupConnection {
protocol: Protocol::MiningProtocol,
min_version,
max_version,
flags,
endpoint_host,
endpoint_port: 50,
vendor,
hardware_version,
firmware,
device_id,
})
}
pub async fn take_channel_factory(self_: Arc<Mutex<Self>>) -> PoolChannelFactory {
while self_.safe_lock(|s| s.channel_factory.is_none()).unwrap() {
tokio::task::yield_now().await;
}
self_
.safe_lock(|s| {
let mut factory = None;
std::mem::swap(&mut s.channel_factory, &mut factory);
factory.unwrap()
})
.unwrap()
}
pub async fn get_job_id(self_: &Arc<Mutex<Self>>, template_id: u64) -> u32 {
loop {
if let Some(id) = self_
.safe_lock(|s| s.template_to_job_id.get_job_id(template_id))
.unwrap()
{
return id;
}
tokio::task::yield_now().await;
}
}
}
impl IsUpstream<Downstream, NullDownstreamMiningSelector> for Upstream {
fn get_version(&self) -> u16 {
todo!()
}
fn get_flags(&self) -> u32 {
todo!()
}
fn get_supported_protocols(&self) -> Vec<Protocol> {
todo!()
}
fn get_id(&self) -> u32 {
todo!()
}
fn get_mapper(&mut self) -> Option<&mut roles_logic_sv2::common_properties::RequestIdMapper> {
todo!()
}
fn get_remote_selector(&mut self) -> &mut NullDownstreamMiningSelector {
todo!()
}
}
impl IsMiningUpstream<Downstream, NullDownstreamMiningSelector> for Upstream {
fn total_hash_rate(&self) -> u64 {
todo!()
}
fn add_hash_rate(&mut self, _to_add: u64) {
todo!()
}
fn get_opened_channels(
&mut self,
) -> &mut Vec<roles_logic_sv2::common_properties::UpstreamChannel> {
todo!()
}
fn update_channels(&mut self, _c: roles_logic_sv2::common_properties::UpstreamChannel) {
todo!()
}
}
impl ParseUpstreamCommonMessages<NoRouting> for Upstream {
fn handle_setup_connection_success(
&mut self,
_: roles_logic_sv2::common_messages_sv2::SetupConnectionSuccess,
) -> Result<SendToCommon, RolesLogicError> {
Ok(SendToCommon::None(None))
}
fn handle_setup_connection_error(
&mut self,
_: roles_logic_sv2::common_messages_sv2::SetupConnectionError,
) -> Result<SendToCommon, RolesLogicError> {
todo!()
}
fn handle_channel_endpoint_changed(
&mut self,
_: roles_logic_sv2::common_messages_sv2::ChannelEndpointChanged,
) -> Result<SendToCommon, RolesLogicError> {
todo!()
}
}
impl ParseUpstreamMiningMessages<Downstream, NullDownstreamMiningSelector, NoRouting> for Upstream {
fn get_channel_type(&self) -> roles_logic_sv2::handlers::mining::SupportedChannelTypes {
roles_logic_sv2::handlers::mining::SupportedChannelTypes::Extended
}
fn is_work_selection_enabled(&self) -> bool {
true
}
fn handle_open_standard_mining_channel_success(
&mut self,
_m: roles_logic_sv2::mining_sv2::OpenStandardMiningChannelSuccess,
_remote: Option<Arc<Mutex<Downstream>>>,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
panic!("Standard Mining Channels are not used in Translator Proxy")
}
fn handle_open_extended_mining_channel_success(
&mut self,
m: roles_logic_sv2::mining_sv2::OpenExtendedMiningChannelSuccess,
) -> Result<SendTo<Downstream>, RolesLogicError> {
info!("Receive open extended mining channel success");
let ids = Arc::new(Mutex::new(roles_logic_sv2::utils::GroupId::new()));
let pool_signature = self.pool_signature.clone();
let prefix_len = m.extranonce_prefix.to_vec().len();
let self_len = 0;
let total_len = prefix_len + m.extranonce_size as usize;
let range_0 = 0..prefix_len;
let range_1 = prefix_len..prefix_len + self_len;
let range_2 = prefix_len + self_len..total_len;
let extranonces = ExtendedExtranonce::new(range_0, range_1, range_2);
let creator = roles_logic_sv2::job_creator::JobsCreators::new(total_len as u8);
let share_per_min = 1.0;
let channel_kind =
roles_logic_sv2::channel_logic::channel_factory::ExtendedChannelKind::ProxyJd {
upstream_target: m.target.clone().into(),
};
let mut channel_factory = PoolChannelFactory::new(
ids,
extranonces,
creator,
share_per_min,
channel_kind,
vec![],
pool_signature,
);
let extranonce: Extranonce = m
.extranonce_prefix
.into_static()
.to_vec()
.try_into()
.unwrap();
self.channel_id = Some(m.channel_id);
channel_factory
.replicate_upstream_extended_channel_only_jd(
m.target.into_static(),
extranonce,
m.channel_id,
m.extranonce_size,
)
.expect("Impossible to open downstream channel");
self.channel_factory = Some(channel_factory);
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
}
fn handle_open_mining_channel_error(
&mut self,
_: roles_logic_sv2::mining_sv2::OpenMiningChannelError,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
}
fn handle_update_channel_error(
&mut self,
_m: roles_logic_sv2::mining_sv2::UpdateChannelError,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
}
fn handle_close_channel(
&mut self,
_m: roles_logic_sv2::mining_sv2::CloseChannel,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
}
fn handle_set_extranonce_prefix(
&mut self,
_: roles_logic_sv2::mining_sv2::SetExtranoncePrefix,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
}
fn handle_submit_shares_success(
&mut self,
_m: roles_logic_sv2::mining_sv2::SubmitSharesSuccess,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
}
fn handle_submit_shares_error(
&mut self,
_m: roles_logic_sv2::mining_sv2::SubmitSharesError,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
self.pool_chaneger_trigger
.safe_lock(|t| t.start(self.tx_status.clone()))
.unwrap();
Ok(SendTo::None(None))
}
fn handle_new_mining_job(
&mut self,
_m: roles_logic_sv2::mining_sv2::NewMiningJob,
) -> Result<SendTo<Downstream>, RolesLogicError> {
panic!("Standard Mining Channels are not used in Translator Proxy")
}
fn handle_new_extended_mining_job(
&mut self,
_: roles_logic_sv2::mining_sv2::NewExtendedMiningJob,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
warn!("Extended job received from upstream, proxy ignore it, and use the one declared by JOB DECLARATOR");
Ok(SendTo::None(None))
}
fn handle_set_new_prev_hash(
&mut self,
_: roles_logic_sv2::mining_sv2::SetNewPrevHash,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
warn!("SNPH received from upstream, proxy ignore it, and use the one declared by JOB DECLARATOR");
Ok(SendTo::None(None))
}
fn handle_set_custom_mining_job_success(
&mut self,
m: roles_logic_sv2::mining_sv2::SetCustomMiningJobSuccess,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
info!("Set custom mining job success {}", m.job_id);
if let Some(template_id) = self.template_to_job_id.take_template_id(m.request_id) {
self.template_to_job_id
.register_job_id(template_id, m.job_id);
Ok(SendTo::None(None))
} else {
error!("Attention received a SetupConnectionSuccess with unknown request_id");
Ok(SendTo::None(None))
}
}
fn handle_set_custom_mining_job_error(
&mut self,
_m: roles_logic_sv2::mining_sv2::SetCustomMiningJobError,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
todo!()
}
fn handle_set_target(
&mut self,
m: roles_logic_sv2::mining_sv2::SetTarget,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
if let Some(factory) = self.channel_factory.as_mut() {
factory.update_target_for_channel(m.channel_id, m.maximum_target.clone().into());
factory.set_target(&mut m.maximum_target.clone().into());
}
if let Some(downstream) = &self.downstream {
let _ = downstream.safe_lock(|d| {
let factory = d.status.get_channel();
factory.set_target(&mut m.maximum_target.clone().into());
factory.update_target_for_channel(m.channel_id, m.maximum_target.into());
});
}
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
}
fn handle_reconnect(
&mut self,
_m: roles_logic_sv2::mining_sv2::Reconnect,
) -> Result<roles_logic_sv2::handlers::mining::SendTo<Downstream>, RolesLogicError> {
Ok(SendTo::RelaySameMessageToRemote(
self.downstream.as_ref().unwrap().clone(),
))
}
}