#![allow(special_module_name)]
mod args;
mod lib;
use lib::{
error::{Error, ProxyResult},
job_declarator::JobDeclarator,
proxy_config::ProxyConfig,
status,
template_receiver::TemplateRx,
PoolChangerTrigger,
};
use args::Args;
use async_channel::{bounded, unbounded};
use futures::{select, FutureExt};
use roles_logic_sv2::utils::Mutex;
use std::{
net::{IpAddr, SocketAddr},
str::FromStr,
sync::Arc,
time::Duration,
};
use tokio::task::AbortHandle;
use tracing::{error, info};
#[allow(clippy::result_large_err)]
fn process_cli_args<'a>() -> ProxyResult<'a, ProxyConfig> {
let args = match Args::from_args() {
Ok(cfg) => cfg,
Err(help) => {
error!("{}", help);
return Err(Error::BadCliArgs);
}
};
let config_file = std::fs::read_to_string(args.config_path)?;
Ok(toml::from_str::<ProxyConfig>(&config_file)?)
}
#[tokio::main]
async fn main() {
tracing_subscriber::fmt::init();
let mut upstream_index = 0;
let mut interrupt_signal_future = Box::pin(tokio::signal::ctrl_c().fuse());
let (tx_status, rx_status) = unbounded();
let task_collector = Arc::new(Mutex::new(vec![]));
let proxy_config = match process_cli_args() {
Ok(p) => p,
Err(_) => return,
};
loop {
{
let task_collector = task_collector.clone();
let tx_status = tx_status.clone();
if let Some(upstream) = proxy_config.upstreams.get(upstream_index) {
let initialize = initialize_jd(
tx_status.clone(),
task_collector,
upstream.clone(),
proxy_config.timeout,
);
tokio::task::spawn(initialize);
} else {
let initialize = initialize_jd_as_solo_miner(
tx_status.clone(),
task_collector,
proxy_config.timeout,
);
tokio::task::spawn(initialize);
}
}
loop {
let task_status = select! {
task_status = rx_status.recv().fuse() => task_status,
interrupt_signal = interrupt_signal_future => {
match interrupt_signal {
Ok(()) => {
info!("Interrupt received");
},
Err(err) => {
error!("Unable to listen for interrupt signal: {}", err);
},
}
std::process::exit(0);
}
};
let task_status: status::Status = task_status.unwrap();
match task_status.state {
status::State::DownstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamShutdown(err) => {
error!("SHUTDOWN from: {}", err);
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::UpstreamRogue => {
error!("Changin Pool");
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
task_collector
.safe_lock(|s| {
for handle in s {
handle.abort();
}
})
.unwrap();
upstream_index += 1;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
break;
}
status::State::Healthy(msg) => {
info!("HEALTHY message: {}", msg);
}
}
}
}
}
async fn initialize_jd_as_solo_miner(
tx_status: async_channel::Sender<status::Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
timeout: Duration,
) {
let proxy_config = process_cli_args().unwrap();
let miner_tx_out = lib::proxy_config::get_coinbase_output(&proxy_config).unwrap();
let (send_solution, recv_solution) = bounded(10);
let downstream_addr = SocketAddr::new(
IpAddr::from_str(&proxy_config.downstream_address).unwrap(),
proxy_config.downstream_port,
);
let downstream = lib::downstream::listen_for_downstream_mining(
downstream_addr,
None,
send_solution,
proxy_config.withhold,
proxy_config.authority_public_key,
proxy_config.authority_secret_key,
proxy_config.cert_validity_sec,
task_collector.clone(),
status::Sender::Downstream(tx_status.clone()),
miner_tx_out.clone(),
None,
)
.await
.unwrap();
let mut parts = proxy_config.tp_address.split(':');
let ip_tp = parts.next().unwrap().to_string();
let port_tp = parts.next().unwrap().parse::<u16>().unwrap();
TemplateRx::connect(
SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp),
recv_solution,
status::Sender::TemplateReceiver(tx_status.clone()),
None,
downstream,
task_collector,
Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))),
miner_tx_out.clone(),
proxy_config.tp_authority_public_key,
false,
)
.await;
}
async fn initialize_jd(
tx_status: async_channel::Sender<status::Status<'static>>,
task_collector: Arc<Mutex<Vec<AbortHandle>>>,
upstream_config: lib::proxy_config::Upstream,
timeout: Duration,
) {
let proxy_config = process_cli_args().unwrap();
let test_only_do_not_send_solution_to_tp = proxy_config
.test_only_do_not_send_solution_to_tp
.unwrap_or(false);
let mut parts = upstream_config.pool_address.split(':');
let address = parts
.next()
.unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address));
let port = parts
.next()
.and_then(|p| p.parse::<u16>().ok())
.unwrap_or_else(|| panic!("Invalid pool address {}", upstream_config.pool_address));
let upstream_addr = SocketAddr::new(
IpAddr::from_str(address)
.unwrap_or_else(|_| panic!("Invalid pool address {}", upstream_config.pool_address)),
port,
);
let (send_solution, recv_solution) = bounded(10);
let upstream = match lib::upstream_sv2::Upstream::new(
upstream_addr,
upstream_config.authority_pubkey,
0, upstream_config.pool_signature.clone(),
status::Sender::Upstream(tx_status.clone()),
task_collector.clone(),
Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))),
)
.await
{
Ok(upstream) => upstream,
Err(e) => {
error!("Failed to create upstream: {}", e);
panic!()
}
};
if let Err(e) = lib::upstream_sv2::Upstream::parse_incoming(upstream.clone()) {
error!("failed to create sv2 parser: {}", e);
panic!()
}
match lib::upstream_sv2::Upstream::setup_connection(
upstream.clone(),
proxy_config.min_supported_version,
proxy_config.max_supported_version,
)
.await
{
Ok(_) => info!("Connected to Upstream!"),
Err(e) => {
error!("Failed to connect to Upstream EXITING! : {}", e);
panic!()
}
}
let downstream_addr = SocketAddr::new(
IpAddr::from_str(&proxy_config.downstream_address).unwrap(),
proxy_config.downstream_port,
);
let mut parts = proxy_config.tp_address.split(':');
let ip_tp = parts.next().unwrap().to_string();
let port_tp = parts.next().unwrap().parse::<u16>().unwrap();
let mut parts = upstream_config.jd_address.split(':');
let ip_jd = parts.next().unwrap().to_string();
let port_jd = parts.next().unwrap().parse::<u16>().unwrap();
let jd = match JobDeclarator::new(
SocketAddr::new(IpAddr::from_str(ip_jd.as_str()).unwrap(), port_jd),
upstream_config.authority_pubkey.into_bytes(),
proxy_config.clone(),
upstream.clone(),
task_collector.clone(),
)
.await
{
Ok(c) => c,
Err(e) => {
let _ = tx_status
.send(status::Status {
state: status::State::UpstreamShutdown(e),
})
.await;
return;
}
};
let downstream = lib::downstream::listen_for_downstream_mining(
downstream_addr,
Some(upstream),
send_solution,
proxy_config.withhold,
proxy_config.authority_public_key,
proxy_config.authority_secret_key,
proxy_config.cert_validity_sec,
task_collector.clone(),
status::Sender::Downstream(tx_status.clone()),
vec![],
Some(jd.clone()),
)
.await
.unwrap();
TemplateRx::connect(
SocketAddr::new(IpAddr::from_str(ip_tp.as_str()).unwrap(), port_tp),
recv_solution,
status::Sender::TemplateReceiver(tx_status.clone()),
Some(jd.clone()),
downstream,
task_collector,
Arc::new(Mutex::new(PoolChangerTrigger::new(timeout))),
vec![],
proxy_config.tp_authority_public_key,
test_only_do_not_send_solution_to_tp,
)
.await;
}