use std::{
sync::{
atomic::{AtomicBool, Ordering},
Arc,
},
thread::JoinHandle,
};
use async_channel::unbounded;
use bitcoin_core_sv2::template_distribution_protocol::CancellationToken;
use stratum_apps::{
stratum_core::bitcoin::consensus::Encodable, task_manager::TaskManager,
tp_type::TemplateProviderType, utils::types::GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS,
};
use tokio::sync::{broadcast, Notify};
use tracing::{debug, error, info, warn};
use jd_server_sv2::job_declarator::{
job_validation::{bitcoin_core_ipc::BitcoinCoreIPCEngine, JobValidationEngine},
JobDeclarator,
};
use crate::{
channel_manager::ChannelManager,
config::PoolConfig,
error::PoolErrorKind,
status::State,
template_receiver::{
bitcoin_core::{connect_to_bitcoin_core, BitcoinCoreSv2TDPConfig},
sv2_tp::Sv2Tp,
},
};
pub mod channel_manager;
pub mod config;
pub mod downstream;
pub mod error;
mod io_task;
#[cfg(feature = "monitoring")]
mod monitoring;
pub mod status;
pub mod template_receiver;
pub mod utils;
#[derive(Debug, Clone)]
pub struct PoolSv2 {
config: PoolConfig,
cancellation_token: CancellationToken,
shutdown_notify: Arc<Notify>,
is_alive: Arc<AtomicBool>,
}
#[cfg_attr(not(test), hotpath::measure_all)]
impl PoolSv2 {
pub fn new(config: PoolConfig) -> Self {
Self {
config,
cancellation_token: CancellationToken::new(),
shutdown_notify: Arc::new(Notify::new()),
is_alive: Arc::new(AtomicBool::new(true)),
}
}
pub async fn start(&self) -> Result<(), PoolErrorKind> {
let coinbase_outputs = vec![self.config.get_txout()];
let mut encoded_outputs = vec![];
coinbase_outputs
.consensus_encode(&mut encoded_outputs)
.expect("Invalid coinbase output in config");
let cancellation_token = self.cancellation_token.clone();
let task_manager = Arc::new(TaskManager::new());
let (status_sender, status_receiver) = unbounded();
let (channel_manager_to_downstream_sender, _channel_manager_to_downstream_receiver) =
broadcast::channel(10);
let (downstream_to_channel_manager_sender, downstream_to_channel_manager_receiver) =
unbounded();
let (channel_manager_to_tp_sender, channel_manager_to_tp_receiver) = unbounded();
let (tp_to_channel_manager_sender, tp_to_channel_manager_receiver) = unbounded();
debug!("Channels initialized.");
let jds_config = self.config.build_jds_config()?;
let mut job_declarator_for_shutdown = None;
let job_declarator = if let Some(jds_config) = jds_config {
info!("JDS config present — initializing embedded Job Declaration Server");
let ipc_engine: Arc<dyn JobValidationEngine> =
match self.config.template_provider_type() {
TemplateProviderType::BitcoinCoreIpc {
network, data_dir, ..
} => Arc::new(
BitcoinCoreIPCEngine::new(
network.clone(),
data_dir.clone(),
cancellation_token.clone(),
)
.await?,
),
TemplateProviderType::Sv2Tp { .. } => {
return Err(PoolErrorKind::Configuration(
"[jds] requires template_provider_type = BitcoinCoreIpc \
(JDS needs direct IPC access to Bitcoin Core)"
.to_string(),
));
}
};
let jd = JobDeclarator::new(
ipc_engine,
cancellation_token.clone(),
jds_config.coinbase_reward_script().clone(),
task_manager.clone(),
)
.await
.map_err(PoolErrorKind::Jds)?;
jd.clone()
.start(cancellation_token.clone(), task_manager.clone())
.await
.map_err(|e| PoolErrorKind::Jds(e.into()))?;
jd.clone()
.start_downstream_server(
*jds_config.authority_public_key(),
*jds_config.authority_secret_key(),
jds_config.cert_validity_sec(),
*jds_config.listen_address(),
task_manager.clone(),
cancellation_token.clone(),
jds_config.supported_extensions().to_vec(),
jds_config.required_extensions().to_vec(),
)
.await
.map_err(|e| PoolErrorKind::Jds(e.into()))?;
info!(
"JDS listening for JDP connections on {}",
jds_config.listen_address()
);
job_declarator_for_shutdown = Some(jd.clone());
Some(jd)
} else {
info!("No [jds] config — Job Declaration not available");
None
};
let channel_manager = ChannelManager::new(
self.config.clone(),
channel_manager_to_tp_sender.clone(),
tp_to_channel_manager_receiver,
channel_manager_to_downstream_sender.clone(),
downstream_to_channel_manager_receiver,
encoded_outputs.clone(),
job_declarator,
)
.await?;
#[cfg(feature = "monitoring")]
if let Some(monitoring_addr) = self.config.monitoring_address() {
info!(
"Initializing monitoring server on http://{}",
monitoring_addr
);
let monitoring_server = stratum_apps::monitoring::MonitoringServer::new(
monitoring_addr,
None, Some(Arc::new(channel_manager.clone())), std::time::Duration::from_secs(
self.config.monitoring_cache_refresh_secs().unwrap_or(15),
),
)
.expect("Failed to initialize monitoring server");
let cancellation_token_clone = cancellation_token.clone();
let shutdown_signal = async move {
cancellation_token_clone.cancelled().await;
};
task_manager.spawn(async move {
if let Err(e) = monitoring_server.run(shutdown_signal).await {
error!("Monitoring server error: {}", e);
}
});
}
let channel_manager_clone = channel_manager.clone();
let channel_manager_for_cleanup = channel_manager.clone();
let mut bitcoin_core_sv2_join_handle: Option<JoinHandle<()>> = None;
match self.config.template_provider_type().clone() {
TemplateProviderType::Sv2Tp {
address,
public_key,
} => {
let sv2_tp = Sv2Tp::new(
address.clone(),
public_key,
channel_manager_to_tp_receiver,
tp_to_channel_manager_sender,
cancellation_token.clone(),
task_manager.clone(),
)
.await?;
sv2_tp
.start(
address,
cancellation_token.clone(),
status_sender.clone(),
task_manager.clone(),
)
.await?;
info!("Sv2 Template Provider setup done");
}
TemplateProviderType::BitcoinCoreIpc {
network,
data_dir,
fee_threshold,
min_interval,
} => {
let unix_socket_path =
stratum_apps::tp_type::resolve_ipc_socket_path(&network, data_dir)
.ok_or_else(|| PoolErrorKind::Configuration(
"Could not determine Bitcoin data directory. Please set data_dir in config.".to_string()
))?;
info!(
"Using Bitcoin Core IPC socket at: {}",
unix_socket_path.display()
);
let incoming_tdp_receiver = channel_manager_to_tp_receiver.clone();
let outgoing_tdp_sender = tp_to_channel_manager_sender.clone();
let bitcoin_core_config = BitcoinCoreSv2TDPConfig {
unix_socket_path,
fee_threshold,
min_interval,
incoming_tdp_receiver,
outgoing_tdp_sender,
cancellation_token: CancellationToken::new(),
};
bitcoin_core_sv2_join_handle = Some(
connect_to_bitcoin_core(
bitcoin_core_config,
cancellation_token.clone(),
task_manager.clone(),
status_sender.clone(),
)
.await,
);
}
}
channel_manager
.start(
cancellation_token.clone(),
status_sender.clone(),
task_manager.clone(),
coinbase_outputs,
)
.await?;
channel_manager_clone
.start_downstream_server(
*self.config.authority_public_key(),
*self.config.authority_secret_key(),
self.config.cert_validity_sec(),
*self.config.listen_address(),
task_manager.clone(),
cancellation_token.clone(),
status_sender,
downstream_to_channel_manager_sender,
channel_manager_to_downstream_sender,
)
.await?;
info!("Spawning status listener task...");
loop {
tokio::select! {
_ = tokio::signal::ctrl_c() => {
info!("Ctrl+C received — initiating graceful shutdown...");
cancellation_token.cancel();
break;
}
_ = cancellation_token.cancelled() => {
break;
}
message = status_receiver.recv() => {
if let Ok(status) = message {
match status.state {
State::DownstreamShutdown{downstream_id,..} => {
warn!("Downstream {downstream_id:?} disconnected — cleaning up channel manager.");
if let Err(e) = channel_manager_for_cleanup.remove_downstream(downstream_id) {
error!("Failed to remove downstream {downstream_id:?}: {e:?}");
cancellation_token.cancel();
break;
}
}
State::TemplateReceiverShutdown(_) => {
warn!("Template Receiver shutdown requested — initiating full shutdown.");
cancellation_token.cancel();
break;
}
State::ChannelManagerShutdown(_) => {
warn!("Channel Manager shutdown requested — initiating full shutdown.");
cancellation_token.cancel();
break;
}
}
}
}
}
}
if let Some(ref jd) = job_declarator_for_shutdown {
info!("Shutting down embedded JDS...");
jd.shutdown();
}
if let Some(bitcoin_core_sv2_join_handle) = bitcoin_core_sv2_join_handle {
info!("Waiting for BitcoinCoreSv2TDP dedicated thread to shutdown...");
match bitcoin_core_sv2_join_handle.join() {
Ok(_) => info!("BitcoinCoreSv2TDP dedicated thread shutdown complete."),
Err(e) => error!("BitcoinCoreSv2TDP dedicated thread error: {e:?}"),
}
}
warn!(
"Graceful shutdown: waiting {} seconds for tasks to finish",
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS
);
match tokio::time::timeout(
std::time::Duration::from_secs(GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS),
task_manager.join_all(),
)
.await
{
Ok(_) => {
info!("All tasks joined cleanly");
}
Err(_) => {
warn!(
"Tasks did not finish within {} seconds, aborting",
GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS
);
task_manager.abort_all().await;
info!("Joining aborted tasks...");
task_manager.join_all().await;
warn!("Forced shutdown complete");
}
}
self.shutdown_notify.notify_waiters();
self.is_alive.store(false, Ordering::Relaxed);
info!("Pool shutdown complete.");
Ok(())
}
pub async fn shutdown(&self) {
if !self.is_alive.load(Ordering::Relaxed) {
return;
}
let notified = self.shutdown_notify.notified();
self.cancellation_token.cancel();
notified.await;
}
}
impl Drop for PoolSv2 {
fn drop(&mut self) {
info!("PoolSv2 dropped");
self.cancellation_token.cancel();
}
}