pool_sv2 0.2.1

SV2 pool role
use std::{
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    thread::JoinHandle,
};

use async_channel::unbounded;

use bitcoin_core_sv2::template_distribution_protocol::CancellationToken;
use stratum_apps::{
    stratum_core::bitcoin::consensus::Encodable, task_manager::TaskManager,
    tp_type::TemplateProviderType, utils::types::GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS,
};
use tokio::sync::{broadcast, Notify};
use tracing::{debug, error, info, warn};

use jd_server_sv2::job_declarator::{
    job_validation::{bitcoin_core_ipc::BitcoinCoreIPCEngine, JobValidationEngine},
    JobDeclarator,
};

use crate::{
    channel_manager::ChannelManager,
    config::PoolConfig,
    error::PoolErrorKind,
    status::State,
    template_receiver::{
        bitcoin_core::{connect_to_bitcoin_core, BitcoinCoreSv2TDPConfig},
        sv2_tp::Sv2Tp,
    },
};

pub mod channel_manager;
pub mod config;
pub mod downstream;
pub mod error;
mod io_task;
#[cfg(feature = "monitoring")]
mod monitoring;
pub mod status;
pub mod template_receiver;
pub mod utils;

#[derive(Debug, Clone)]
pub struct PoolSv2 {
    config: PoolConfig,
    cancellation_token: CancellationToken,
    shutdown_notify: Arc<Notify>,
    is_alive: Arc<AtomicBool>,
}

#[cfg_attr(not(test), hotpath::measure_all)]
impl PoolSv2 {
    pub fn new(config: PoolConfig) -> Self {
        Self {
            config,
            cancellation_token: CancellationToken::new(),
            shutdown_notify: Arc::new(Notify::new()),
            is_alive: Arc::new(AtomicBool::new(true)),
        }
    }

    /// Starts the Pool main loop.
    pub async fn start(&self) -> Result<(), PoolErrorKind> {
        let coinbase_outputs = vec![self.config.get_txout()];
        let mut encoded_outputs = vec![];

        coinbase_outputs
            .consensus_encode(&mut encoded_outputs)
            .expect("Invalid coinbase output in config");

        let cancellation_token = self.cancellation_token.clone();

        let task_manager = Arc::new(TaskManager::new());

        let (status_sender, status_receiver) = unbounded();

        let (channel_manager_to_downstream_sender, _channel_manager_to_downstream_receiver) =
            broadcast::channel(10);
        let (downstream_to_channel_manager_sender, downstream_to_channel_manager_receiver) =
            unbounded();

        let (channel_manager_to_tp_sender, channel_manager_to_tp_receiver) = unbounded();
        let (tp_to_channel_manager_sender, tp_to_channel_manager_receiver) = unbounded();

        debug!("Channels initialized.");

        // Build and launch embedded JDS if configured.
        let jds_config = self.config.build_jds_config()?;
        let mut job_declarator_for_shutdown = None;

        let job_declarator = if let Some(jds_config) = jds_config {
            info!("JDS config present — initializing embedded Job Declaration Server");

            let ipc_engine: Arc<dyn JobValidationEngine> =
                match self.config.template_provider_type() {
                    TemplateProviderType::BitcoinCoreIpc {
                        network, data_dir, ..
                    } => Arc::new(
                        BitcoinCoreIPCEngine::new(
                            network.clone(),
                            data_dir.clone(),
                            cancellation_token.clone(),
                        )
                        .await?,
                    ),
                    TemplateProviderType::Sv2Tp { .. } => {
                        return Err(PoolErrorKind::Configuration(
                            "[jds] requires template_provider_type = BitcoinCoreIpc \
                         (JDS needs direct IPC access to Bitcoin Core)"
                                .to_string(),
                        ));
                    }
                };

            let jd = JobDeclarator::new(
                ipc_engine,
                cancellation_token.clone(),
                jds_config.coinbase_reward_script().clone(),
                task_manager.clone(),
            )
            .await
            .map_err(PoolErrorKind::Jds)?;

            jd.clone()
                .start(cancellation_token.clone(), task_manager.clone())
                .await
                .map_err(|e| PoolErrorKind::Jds(e.into()))?;

            jd.clone()
                .start_downstream_server(
                    *jds_config.authority_public_key(),
                    *jds_config.authority_secret_key(),
                    jds_config.cert_validity_sec(),
                    *jds_config.listen_address(),
                    task_manager.clone(),
                    cancellation_token.clone(),
                    jds_config.supported_extensions().to_vec(),
                    jds_config.required_extensions().to_vec(),
                )
                .await
                .map_err(|e| PoolErrorKind::Jds(e.into()))?;

            info!(
                "JDS listening for JDP connections on {}",
                jds_config.listen_address()
            );

            job_declarator_for_shutdown = Some(jd.clone());
            Some(jd)
        } else {
            info!("No [jds] config — Job Declaration not available");
            None
        };

        let channel_manager = ChannelManager::new(
            self.config.clone(),
            channel_manager_to_tp_sender.clone(),
            tp_to_channel_manager_receiver,
            channel_manager_to_downstream_sender.clone(),
            downstream_to_channel_manager_receiver,
            encoded_outputs.clone(),
            job_declarator,
        )
        .await?;

        // Start monitoring server if configured
        #[cfg(feature = "monitoring")]
        if let Some(monitoring_addr) = self.config.monitoring_address() {
            info!(
                "Initializing monitoring server on http://{}",
                monitoring_addr
            );

            let monitoring_server = stratum_apps::monitoring::MonitoringServer::new(
                monitoring_addr,
                None, // Pool doesn't have channels opened with servers
                Some(Arc::new(channel_manager.clone())), // channels opened with clients
                std::time::Duration::from_secs(
                    self.config.monitoring_cache_refresh_secs().unwrap_or(15),
                ),
            )
            .expect("Failed to initialize monitoring server");

            let cancellation_token_clone = cancellation_token.clone();
            let shutdown_signal = async move {
                cancellation_token_clone.cancelled().await;
            };

            task_manager.spawn(async move {
                if let Err(e) = monitoring_server.run(shutdown_signal).await {
                    error!("Monitoring server error: {}", e);
                }
            });
        }

        let channel_manager_clone = channel_manager.clone();
        let channel_manager_for_cleanup = channel_manager.clone();
        let mut bitcoin_core_sv2_join_handle: Option<JoinHandle<()>> = None;

        match self.config.template_provider_type().clone() {
            TemplateProviderType::Sv2Tp {
                address,
                public_key,
            } => {
                let sv2_tp = Sv2Tp::new(
                    address.clone(),
                    public_key,
                    channel_manager_to_tp_receiver,
                    tp_to_channel_manager_sender,
                    cancellation_token.clone(),
                    task_manager.clone(),
                )
                .await?;

                sv2_tp
                    .start(
                        address,
                        cancellation_token.clone(),
                        status_sender.clone(),
                        task_manager.clone(),
                    )
                    .await?;

                info!("Sv2 Template Provider setup done");
            }
            TemplateProviderType::BitcoinCoreIpc {
                network,
                data_dir,
                fee_threshold,
                min_interval,
            } => {
                let unix_socket_path =
                    stratum_apps::tp_type::resolve_ipc_socket_path(&network, data_dir)
                        .ok_or_else(|| PoolErrorKind::Configuration(
                            "Could not determine Bitcoin data directory. Please set data_dir in config.".to_string()
                        ))?;

                info!(
                    "Using Bitcoin Core IPC socket at: {}",
                    unix_socket_path.display()
                );

                // incoming and outgoing TDP channels from the perspective of BitcoinCoreSv2TDP
                let incoming_tdp_receiver = channel_manager_to_tp_receiver.clone();
                let outgoing_tdp_sender = tp_to_channel_manager_sender.clone();

                let bitcoin_core_config = BitcoinCoreSv2TDPConfig {
                    unix_socket_path,
                    fee_threshold,
                    min_interval,
                    incoming_tdp_receiver,
                    outgoing_tdp_sender,
                    cancellation_token: CancellationToken::new(),
                };

                bitcoin_core_sv2_join_handle = Some(
                    connect_to_bitcoin_core(
                        bitcoin_core_config,
                        cancellation_token.clone(),
                        task_manager.clone(),
                        status_sender.clone(),
                    )
                    .await,
                );
            }
        }

        channel_manager
            .start(
                cancellation_token.clone(),
                status_sender.clone(),
                task_manager.clone(),
                coinbase_outputs,
            )
            .await?;

        channel_manager_clone
            .start_downstream_server(
                *self.config.authority_public_key(),
                *self.config.authority_secret_key(),
                self.config.cert_validity_sec(),
                *self.config.listen_address(),
                task_manager.clone(),
                cancellation_token.clone(),
                status_sender,
                downstream_to_channel_manager_sender,
                channel_manager_to_downstream_sender,
            )
            .await?;

        info!("Spawning status listener task...");
        loop {
            tokio::select! {
                _ = tokio::signal::ctrl_c() => {
                    info!("Ctrl+C received — initiating graceful shutdown...");
                    cancellation_token.cancel();
                    break;
                }
                _ = cancellation_token.cancelled() => {
                    break;
                }
                message = status_receiver.recv() => {
                    if let Ok(status) = message {
                        match status.state {
                            State::DownstreamShutdown{downstream_id,..} => {
                                warn!("Downstream {downstream_id:?} disconnected — cleaning up channel manager.");
                                // Remove downstream from channel manager to prevent memory leak
                                if let Err(e) = channel_manager_for_cleanup.remove_downstream(downstream_id) {
                                    error!("Failed to remove downstream {downstream_id:?}: {e:?}");
                                    cancellation_token.cancel();
                                    break;
                                }
                            }
                            State::TemplateReceiverShutdown(_) => {
                                warn!("Template Receiver shutdown requested — initiating full shutdown.");
                                cancellation_token.cancel();
                                break;
                            }
                            State::ChannelManagerShutdown(_) => {
                                warn!("Channel Manager shutdown requested — initiating full shutdown.");
                                cancellation_token.cancel();
                                break;
                            }
                        }
                    }
                }
            }
        }

        if let Some(ref jd) = job_declarator_for_shutdown {
            info!("Shutting down embedded JDS...");
            jd.shutdown();
        }

        if let Some(bitcoin_core_sv2_join_handle) = bitcoin_core_sv2_join_handle {
            info!("Waiting for BitcoinCoreSv2TDP dedicated thread to shutdown...");
            match bitcoin_core_sv2_join_handle.join() {
                Ok(_) => info!("BitcoinCoreSv2TDP dedicated thread shutdown complete."),
                Err(e) => error!("BitcoinCoreSv2TDP dedicated thread error: {e:?}"),
            }
        }

        warn!(
            "Graceful shutdown: waiting {} seconds for tasks to finish",
            GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS
        );

        match tokio::time::timeout(
            std::time::Duration::from_secs(GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS),
            task_manager.join_all(),
        )
        .await
        {
            Ok(_) => {
                info!("All tasks joined cleanly");
            }
            Err(_) => {
                warn!(
                    "Tasks did not finish within {} seconds, aborting",
                    GRACEFUL_SHUTDOWN_TIMEOUT_SECONDS
                );
                task_manager.abort_all().await;
                info!("Joining aborted tasks...");
                task_manager.join_all().await;
                warn!("Forced shutdown complete");
            }
        }
        self.shutdown_notify.notify_waiters();
        self.is_alive.store(false, Ordering::Relaxed);
        info!("Pool shutdown complete.");
        Ok(())
    }

    pub async fn shutdown(&self) {
        if !self.is_alive.load(Ordering::Relaxed) {
            return;
        }
        // The Notified future is guaranteed to receive wakeups from notify_waiters()
        // as soon as it has been created, even if it has not yet been polled.
        let notified = self.shutdown_notify.notified();
        self.cancellation_token.cancel();
        notified.await;
    }
}

impl Drop for PoolSv2 {
    fn drop(&mut self) {
        info!("PoolSv2 dropped");
        self.cancellation_token.cancel();
    }
}