use crate::{
mining_device::Secp256k1PublicKey as MiningDeviceSecp256k1PublicKey, sniffer::*,
sv1_minerd::MinerdProcess, template_provider::*,
};
use interceptor::InterceptAction;
use jd_client_sv2::{config::ConfigJDCMode, JobDeclaratorClient};
use once_cell::sync::OnceCell;
use pool_sv2::PoolSv2;
use std::{
convert::TryFrom,
net::{Ipv4Addr, SocketAddr},
time::Duration,
};
use stratum_apps::{
config_helpers::CoinbaseRewardScript,
key_utils::{Secp256k1PublicKey, Secp256k1SecretKey},
tp_type::TemplateProviderType,
};
use tracing::Level;
use tracing_subscriber::{fmt, layer::SubscriberExt, util::SubscriberInitExt, EnvFilter};
use translator_sv2::TranslatorSv2;
use utils::get_available_address;
pub mod interceptor;
pub mod message_aggregator;
pub mod mining_device;
pub mod mock_roles;
pub mod prometheus_metrics_assertions;
pub mod sniffer;
pub mod sniffer_error;
pub mod sv1_minerd;
pub mod sv1_sniffer;
pub mod template_provider;
pub mod types;
pub mod utils;
#[macro_export]
macro_rules! shutdown_all {
($($handle:expr),+ $(,)?) => {
tokio::join!($($handle.shutdown()),+)
};
}
const SHARES_PER_MINUTE: f32 = 120.0;
pub const POOL_COINBASE_REWARD_ADDRESS: &str = "tb1qa0sm0hxzj0x25rh8gw5xlzwlsfvvyz8u96w3p8";
const POOL_COINBASE_REWARD_DESCRIPTOR: &str = "addr(tb1qa0sm0hxzj0x25rh8gw5xlzwlsfvvyz8u96w3p8)";
const JDC_COINBASE_REWARD_DESCRIPTOR: &str = "addr(tb1qpusf5256yxv50qt0pm0tue8k952fsu5lzsphft)";
static LOGGER: OnceCell<()> = OnceCell::new();
pub fn sv2_tp_config(address: SocketAddr) -> TemplateProviderType {
TemplateProviderType::Sv2Tp {
address: address.to_string(),
public_key: None,
}
}
pub fn ipc_config(
data_dir: std::path::PathBuf,
is_signet: bool,
min_interval: Option<u8>,
) -> TemplateProviderType {
use stratum_apps::tp_type::BitcoinNetwork;
let network = if is_signet {
BitcoinNetwork::Signet
} else {
BitcoinNetwork::Regtest
};
TemplateProviderType::BitcoinCoreIpc {
network,
data_dir: Some(data_dir),
fee_threshold: 0,
min_interval: min_interval.unwrap_or(5),
}
}
pub fn start_tracing() {
LOGGER.get_or_init(|| {
let env_filter = EnvFilter::try_from_default_env()
.unwrap_or_else(|_| EnvFilter::new(Level::INFO.to_string()));
tracing_subscriber::registry()
.with(env_filter)
.with(fmt::layer())
.init();
});
}
pub fn start_sniffer(
identifier: &str,
upstream: SocketAddr,
check_on_drop: bool,
action: Vec<InterceptAction>,
timeout: Option<u64>,
) -> (Sniffer<'_>, SocketAddr) {
let listening_address = get_available_address();
let sniffer = Sniffer::new(
identifier,
listening_address,
upstream,
check_on_drop,
action,
timeout,
);
sniffer.start();
(sniffer, listening_address)
}
pub async fn start_pool(
template_provider_config: TemplateProviderType,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
enable_monitoring: bool,
) -> (PoolSv2, SocketAddr, Option<SocketAddr>) {
use pool_sv2::config::PoolConfig;
let listening_address = get_available_address();
let authority_public_key = Secp256k1PublicKey::try_from(
"9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72".to_string(),
)
.expect("failed");
let authority_secret_key = Secp256k1SecretKey::try_from(
"mkDLTBBRxdBv998612qipDYoTK3YUrqLe8uWw7gu3iXbSrn2n".to_string(),
)
.expect("failed");
let cert_validity_sec = 3600;
let coinbase_reward_script =
CoinbaseRewardScript::from_descriptor(POOL_COINBASE_REWARD_DESCRIPTOR).unwrap();
let pool_signature = "Stratum V2 SRI Pool".to_string();
let connection_config = pool_sv2::config::ConnectionConfig::new(
listening_address,
cert_validity_sec,
pool_signature,
);
let authority_config =
pool_sv2::config::AuthorityConfig::new(authority_public_key, authority_secret_key);
let share_batch_size = 1;
let monitoring_address = if enable_monitoring {
Some(get_available_address())
} else {
None
};
let monitoring_cache_refresh_secs = if enable_monitoring { Some(1) } else { None };
let config = PoolConfig::new(
connection_config,
template_provider_config,
authority_config,
coinbase_reward_script,
SHARES_PER_MINUTE,
share_batch_size,
1,
supported_extensions,
required_extensions,
monitoring_address,
monitoring_cache_refresh_secs,
None, );
let pool = PoolSv2::new(config);
let pool_clone = pool.clone();
tokio::spawn(async move {
_ = pool_clone.start().await;
});
tokio::time::sleep(Duration::from_secs(1)).await;
(pool, listening_address, monitoring_address)
}
pub fn start_template_provider(
sv2_interval: Option<u32>,
difficulty_level: DifficultyLevel,
) -> (TemplateProvider, SocketAddr) {
let address = get_available_address();
let sv2_interval = sv2_interval.unwrap_or(20);
let template_provider = TemplateProvider::start(address.port(), sv2_interval, difficulty_level);
template_provider.generate_blocks(1);
(template_provider, address)
}
pub fn start_bitcoin_core(difficulty_level: DifficultyLevel) -> BitcoinCore {
let address = get_available_address();
let bitcoin_core = BitcoinCore::start(address.port(), difficulty_level);
bitcoin_core.generate_blocks(1);
bitcoin_core
}
pub fn start_jdc(
pool: &[(SocketAddr, SocketAddr)], template_provider_config: TemplateProviderType,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
enable_monitoring: bool,
jdc_mode: Option<ConfigJDCMode>,
) -> (JobDeclaratorClient, SocketAddr, Option<SocketAddr>) {
use jd_client_sv2::config::{JobDeclaratorClientConfig, PoolConfig, ProtocolConfig, Upstream};
let jdc_address = get_available_address();
let max_supported_version = 2;
let min_supported_version = 2;
let authority_public_key = Secp256k1PublicKey::try_from(
"9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72".to_string(),
)
.unwrap();
let authority_secret_key = Secp256k1SecretKey::try_from(
"mkDLTBBRxdBv998612qipDYoTK3YUrqLe8uWw7gu3iXbSrn2n".to_string(),
)
.unwrap();
let coinbase_reward_script =
CoinbaseRewardScript::from_descriptor(JDC_COINBASE_REWARD_DESCRIPTOR).unwrap();
let authority_pubkey = Secp256k1PublicKey::try_from(
"9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72".to_string(),
)
.unwrap();
let upstreams = pool
.iter()
.map(|(pool_addr, jds_addr)| {
Upstream::new(
authority_pubkey,
pool_addr.ip().to_string(),
pool_addr.port(),
jds_addr.ip().to_string(),
jds_addr.port(),
)
})
.collect();
let pool_config = PoolConfig::new(authority_public_key, authority_secret_key);
let protocol_config = ProtocolConfig::new(
max_supported_version,
min_supported_version,
coinbase_reward_script,
);
let shares_per_minute = 10.0;
let shares_batch_size = 1;
let user_identity = "IT-test".to_string();
let jdc_signature = "JDC".to_string();
let monitoring_address = if enable_monitoring {
Some(get_available_address())
} else {
None
};
let monitoring_cache_refresh_secs = if enable_monitoring { Some(1) } else { None };
let jd_client_proxy = JobDeclaratorClientConfig::new(
jdc_address,
protocol_config,
user_identity,
shares_per_minute,
shares_batch_size,
pool_config,
3600,
template_provider_config,
upstreams,
jdc_signature,
jdc_mode,
supported_extensions,
required_extensions,
monitoring_address,
monitoring_cache_refresh_secs,
);
let ret = jd_client_sv2::JobDeclaratorClient::new(jd_client_proxy);
let ret_clone = ret.clone();
tokio::spawn(async move { ret_clone.start().await });
(ret, jdc_address, monitoring_address)
}
pub async fn start_pool_with_jds(
bitcoin_core: &BitcoinCore,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
enable_monitoring: bool,
) -> (PoolSv2, SocketAddr, SocketAddr, Option<SocketAddr>) {
use pool_sv2::config::{AuthorityConfig, ConnectionConfig, JDSPartialConfig, PoolConfig};
let pool_address = get_available_address();
let jds_address = get_available_address();
let authority_public_key = Secp256k1PublicKey::try_from(
"9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72".to_string(),
)
.expect("failed");
let authority_secret_key = Secp256k1SecretKey::try_from(
"mkDLTBBRxdBv998612qipDYoTK3YUrqLe8uWw7gu3iXbSrn2n".to_string(),
)
.expect("failed");
let cert_validity_sec = 3600;
let coinbase_reward_script =
CoinbaseRewardScript::from_descriptor(POOL_COINBASE_REWARD_DESCRIPTOR).unwrap();
let template_provider_config = ipc_config(
bitcoin_core.data_dir().clone(),
bitcoin_core.is_signet(),
None,
);
let pool_signature = "Stratum V2 SRI Pool".to_string();
let connection_config = ConnectionConfig::new(pool_address, cert_validity_sec, pool_signature);
let authority_config = AuthorityConfig::new(authority_public_key, authority_secret_key);
let share_batch_size = 1;
let monitoring_address = if enable_monitoring {
Some(get_available_address())
} else {
None
};
let monitoring_cache_refresh_secs = if enable_monitoring { Some(1) } else { None };
let config = PoolConfig::new(
connection_config,
template_provider_config,
authority_config,
coinbase_reward_script.clone(),
SHARES_PER_MINUTE,
share_batch_size,
1,
supported_extensions.clone(),
required_extensions.clone(),
monitoring_address,
monitoring_cache_refresh_secs,
Some(JDSPartialConfig::new(jds_address)),
);
let pool = PoolSv2::new(config);
let pool_clone = pool.clone();
tokio::spawn(async move {
_ = pool_clone.start().await;
});
tokio::time::sleep(Duration::from_secs(1)).await;
(pool, pool_address, jds_address, monitoring_address)
}
pub async fn start_sv2_translator(
upstreams: &[SocketAddr],
aggregate_channels: bool,
supported_extensions: Vec<u16>,
required_extensions: Vec<u16>,
job_keepalive_interval_secs: Option<u16>,
enable_monitoring: bool,
) -> (TranslatorSv2, SocketAddr, Option<SocketAddr>) {
let job_keepalive_interval_secs = job_keepalive_interval_secs.unwrap_or(60);
let upstreams = upstreams
.iter()
.map(|upstream| {
let upstream_address = upstream.ip().to_string();
let upstream_port = upstream.port();
let upstream_authority_pubkey = Secp256k1PublicKey::try_from(
"9auqWEzQDVyd2oe1JVGFLMLHZtCo2FFqZwtKA5gd9xbuEu7PH72".to_string(),
)
.expect("failed");
translator_sv2::config::Upstream::new(
upstream_address,
upstream_port,
upstream_authority_pubkey,
)
})
.collect();
let listening_address = get_available_address();
let listening_port = listening_address.port();
let minerd_process = MinerdProcess::new(SocketAddr::from((Ipv4Addr::LOCALHOST, 0)), false)
.await
.unwrap();
let min_individual_miner_hashrate = minerd_process.measure_hashrate().await.unwrap() as f32;
let downstream_difficulty_config = translator_sv2::config::DownstreamDifficultyConfig::new(
min_individual_miner_hashrate,
SHARES_PER_MINUTE,
true,
job_keepalive_interval_secs,
);
let downstream_extranonce2_size = 4;
let monitoring_address = if enable_monitoring {
Some(get_available_address())
} else {
None
};
let monitoring_cache_refresh_secs = if enable_monitoring { Some(1) } else { None };
let config = translator_sv2::config::TranslatorConfig::new(
upstreams,
listening_address.ip().to_string(),
listening_port,
downstream_difficulty_config,
2,
2,
downstream_extranonce2_size,
"user_identity".to_string(),
aggregate_channels,
supported_extensions,
required_extensions,
monitoring_address,
monitoring_cache_refresh_secs,
);
let translator_v2 = translator_sv2::TranslatorSv2::new(config);
let clone_translator_v2 = translator_v2.clone();
tokio::spawn(async move {
clone_translator_v2.start().await;
});
(translator_v2, listening_address, monitoring_address)
}
pub async fn start_minerd(
upstream_addr: SocketAddr,
username: Option<String>,
password: Option<String>,
single_submit: bool,
) -> (sv1_minerd::MinerdProcess, SocketAddr) {
let (process, local_addr) =
sv1_minerd::start_minerd(upstream_addr, username, password, single_submit)
.await
.expect("Failed to start minerd process");
(process, local_addr)
}
pub fn start_mining_device_sv2(
upstream: SocketAddr,
pub_key: Option<MiningDeviceSecp256k1PublicKey>,
device_id: Option<String>,
user_id: Option<String>,
handicap: u32,
nominal_hashrate_multiplier: Option<f32>,
single_submit: bool,
) {
tokio::spawn(async move {
crate::mining_device::connect(
upstream.to_string(),
pub_key,
device_id,
user_id,
handicap,
nominal_hashrate_multiplier,
single_submit,
)
.await;
});
}
pub fn start_sv1_sniffer(upstream_address: SocketAddr) -> (sv1_sniffer::SnifferSV1, SocketAddr) {
let listening_address = get_available_address();
let sniffer_sv1 = sv1_sniffer::SnifferSV1::new(listening_address, upstream_address);
sniffer_sv1.start();
(sniffer_sv1, listening_address)
}