Skip to main content

abtc_infrastructure/
lib.rs

1//! Bitcoin Infrastructure Layer - Composition Root
2//!
3//! This is the outermost layer that wires everything together.
4//! It serves as the composition root for dependency injection and
5//! the entry point for the entire Bitcoin implementation.
6//!
7//! ## Graceful Shutdown
8//!
9//! All background tasks listen to a shutdown signal via `tokio::sync::watch`.
10//! The `run()` entry point waits for SIGINT (Ctrl+C) or SIGTERM, then triggers
11//! an orderly shutdown with a configurable timeout.
12
13use abtc_adapters::{
14    FileBasedWalletStore, InMemoryBlockStore, InMemoryChainStateStore, InMemoryMempool,
15    InMemoryWallet, JsonRpcServer, PersistentWallet, SimpleMiner, StubPeerManager, TcpPeerManager,
16};
17use abtc_application::block_index::BlockIndex;
18use abtc_application::fee_estimator::FeeEstimator;
19use abtc_application::handlers::{BlockchainRpcHandler, MiningRpcHandler, WalletRpcHandler};
20use abtc_application::net_processing::{SyncAction, SyncManager, SyncState};
21use abtc_application::rebroadcast::RebroadcastManager;
22use abtc_application::services::{BlockchainService, MempoolService, MiningService};
23use abtc_domain::wallet::address::AddressType;
24use abtc_domain::ChainParams;
25use abtc_ports::{
26    BlockStore, ChainStateStore, MempoolPort, PeerEvent, PeerManager, RpcServer, WalletPort,
27};
28use clap::Parser;
29use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
30use std::sync::Arc;
31use tokio::sync::RwLock;
32use tracing_subscriber::EnvFilter;
33
34/// Shutdown timeout in seconds — if services don't stop within this window
35/// after receiving the shutdown signal, the process exits forcefully.
36const SHUTDOWN_TIMEOUT_SECS: u64 = 10;
37
38/// Node health status — tracks liveness of background tasks and overall node health.
39#[derive(Debug, Clone)]
40pub struct NodeHealth {
41    /// Whether the node is running (not shutting down).
42    pub is_running: bool,
43    /// Number of background tasks that are alive.
44    pub active_tasks: u32,
45    /// Total background tasks spawned.
46    pub total_tasks: u32,
47    /// Uptime in seconds.
48    pub uptime_secs: u64,
49    /// Current sync state.
50    pub sync_state: String,
51    /// Block index height.
52    pub block_height: u32,
53    /// Mempool transaction count.
54    pub mempool_size: u32,
55    /// Connected peer count.
56    pub peer_count: u32,
57    /// RPC server running.
58    pub rpc_running: bool,
59}
60
61/// Tracks liveness of spawned background tasks.
62struct TaskTracker {
63    /// Number of tasks still running.
64    active: Arc<AtomicU64>,
65    /// Total tasks launched.
66    total: u32,
67}
68
69impl TaskTracker {
70    fn new() -> Self {
71        TaskTracker {
72            active: Arc::new(AtomicU64::new(0)),
73            total: 0,
74        }
75    }
76
77    /// Register a new task and return a guard that decrements on drop.
78    fn register(&mut self) -> TaskGuard {
79        self.total += 1;
80        self.active.fetch_add(1, Ordering::Relaxed);
81        TaskGuard {
82            active: self.active.clone(),
83        }
84    }
85
86    fn active_count(&self) -> u64 {
87        self.active.load(Ordering::Relaxed)
88    }
89}
90
91/// RAII guard — decrements the active task count when dropped.
92struct TaskGuard {
93    active: Arc<AtomicU64>,
94}
95
96impl Drop for TaskGuard {
97    fn drop(&mut self) {
98        self.active.fetch_sub(1, Ordering::Relaxed);
99    }
100}
101
102/// Command-line arguments for the Bitcoin node
103#[derive(Parser, Debug)]
104#[command(
105    name = "Agentic Bitcoin",
106    about = "A Bitcoin Core reimplementation in Rust with hexagonal architecture",
107    version
108)]
109pub struct CliArgs {
110    /// Network to connect to (mainnet, testnet, regtest, signet)
111    #[arg(long, default_value = "mainnet")]
112    pub network: String,
113
114    /// Data directory for storing blockchain data
115    #[arg(long, default_value = "~/.bitcoin")]
116    pub datadir: String,
117
118    /// RPC server port
119    #[arg(long, default_value = "8332")]
120    pub rpc_port: u16,
121
122    /// P2P network port
123    #[arg(long, default_value = "8333")]
124    pub p2p_port: u16,
125
126    /// Log level (trace, debug, info, warn, error)
127    #[arg(long, default_value = "info")]
128    pub log_level: String,
129
130    /// Maximum mempool size in megabytes
131    #[arg(long, default_value = "300")]
132    pub max_mempool_mb: u64,
133
134    /// Enable real TCP P2P networking (default: stub/offline mode)
135    #[arg(long, default_value = "false")]
136    pub enable_p2p: bool,
137
138    /// Seed peer addresses (comma-separated) for initial connections
139    #[arg(long, value_delimiter = ',')]
140    pub seed_peers: Vec<String>,
141
142    /// Storage backend: "memory" (default) or "rocksdb" (requires rocksdb-storage feature)
143    #[arg(long, default_value = "memory")]
144    pub storage_backend: String,
145
146    /// Enable wallet functionality
147    #[arg(long, default_value = "true")]
148    pub enable_wallet: bool,
149
150    /// Wallet address type: "bech32" (P2WPKH, default), "legacy" (P2PKH), or "p2sh-segwit" (P2SH-P2WPKH)
151    #[arg(long, default_value = "bech32")]
152    pub address_type: String,
153
154    /// Path to wallet file for persistent storage (optional).
155    /// When set, wallet state (keys, UTXOs, metadata) is saved to this
156    /// JSON file after every mutation and loaded on startup.
157    #[arg(long)]
158    pub wallet_file: Option<String>,
159
160    /// Custom signet challenge script (hex string, optional).
161    /// Overrides the default signet challenge when network is "signet".
162    /// Enables custom/private signet networks.
163    #[arg(long)]
164    pub signet_challenge: Option<String>,
165}
166
167/// Application state containing all services and ports
168pub struct BitcoinNode {
169    pub blockchain: Arc<BlockchainService>,
170    pub mempool: Arc<MempoolService>,
171    pub mining: Arc<MiningService>,
172    pub rpc_server: Arc<JsonRpcServer>,
173    pub peer_manager: Arc<dyn PeerManager>,
174    pub mempool_adapter: Arc<InMemoryMempool>,
175    pub block_index: Arc<RwLock<BlockIndex>>,
176    pub sync_manager: Arc<RwLock<SyncManager>>,
177    /// Fee estimator — updated every time a block is connected
178    pub fee_estimator: Arc<RwLock<FeeEstimator>>,
179    /// Rebroadcast manager — tracks wallet txs for periodic re-announcement
180    pub rebroadcast_manager: Arc<RwLock<RebroadcastManager>>,
181    /// Block store (for sync manager use)
182    block_store: Arc<dyn BlockStore>,
183    /// Chain state store (for sync manager use)
184    chain_state: Arc<dyn ChainStateStore>,
185    /// Optional TCP peer manager (only when enable_p2p is true)
186    tcp_peer_manager: Option<Arc<TcpPeerManager>>,
187    /// Seed peers to connect to on start
188    seed_peers: Vec<String>,
189    /// Wallet (optional — only when enable_wallet is true)
190    pub wallet: Option<Arc<dyn WalletPort>>,
191    /// Shutdown signal sender — set to `true` to initiate graceful shutdown.
192    shutdown_tx: tokio::sync::watch::Sender<bool>,
193    /// Shutdown signal receiver — background tasks clone and listen to this.
194    shutdown_rx: tokio::sync::watch::Receiver<bool>,
195    /// Background task tracker — monitors liveness of spawned tasks.
196    task_tracker: Arc<std::sync::Mutex<TaskTracker>>,
197    /// Node start time (Unix timestamp) for uptime calculation.
198    start_time: Arc<AtomicU64>,
199    /// Whether the node is running (not shutting down).
200    running: Arc<AtomicBool>,
201}
202
203impl BitcoinNode {
204    /// Create and wire all components
205    pub async fn new(args: CliArgs) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
206        // Initialize tracing/logging.
207        // Use try_init() so that concurrent tests (or repeated calls) don't
208        // panic if a global subscriber has already been installed.
209        let _ = tracing_subscriber::fmt()
210            .with_env_filter(
211                EnvFilter::try_from_default_env()
212                    .unwrap_or_else(|_| EnvFilter::new(&args.log_level)),
213            )
214            .try_init();
215
216        tracing::info!("Initializing Agentic Bitcoin node on {}", args.network);
217
218        // Parse network
219        let network = match args.network.as_str() {
220            "mainnet" => abtc_domain::Network::Mainnet,
221            "testnet" => abtc_domain::Network::Testnet,
222            "regtest" => abtc_domain::Network::Regtest,
223            "signet" => abtc_domain::Network::Signet,
224            _ => {
225                return Err(format!("Unknown network: {}", args.network).into());
226            }
227        };
228
229        let mut chain_params = ChainParams::for_network(network);
230
231        // Apply custom signet challenge if provided
232        if let Some(ref challenge_hex) = args.signet_challenge {
233            match hex::decode(challenge_hex) {
234                Ok(challenge_bytes) => {
235                    chain_params.consensus.signet_challenge = Some(challenge_bytes);
236                    tracing::info!("Using custom signet challenge: {}", challenge_hex);
237                }
238                Err(e) => {
239                    return Err(format!(
240                        "Invalid --signet-challenge hex: {}: {}",
241                        challenge_hex, e
242                    )
243                    .into());
244                }
245            }
246        }
247
248        tracing::info!("Using chain parameters for {}", network);
249
250        // Create storage adapters based on backend flag
251        let (block_store, chain_state): (Arc<dyn BlockStore>, Arc<dyn ChainStateStore>) =
252            match args.storage_backend.as_str() {
253                #[cfg(feature = "rocksdb-storage")]
254                "rocksdb" => {
255                    use abtc_adapters::{RocksDbBlockStore, RocksDbChainStateStore};
256                    use std::path::Path;
257
258                    let datadir = args
259                        .datadir
260                        .replace('~', &std::env::var("HOME").unwrap_or_default());
261                    let blocks_path = format!("{}/blocks", datadir);
262                    let chainstate_path = format!("{}/chainstate", datadir);
263
264                    // Create directories if they don't exist
265                    std::fs::create_dir_all(&blocks_path)?;
266                    std::fs::create_dir_all(&chainstate_path)?;
267
268                    tracing::info!(
269                        "Using RocksDB storage (blocks: {}, chainstate: {})",
270                        blocks_path,
271                        chainstate_path
272                    );
273
274                    let bs = Arc::new(RocksDbBlockStore::open(Path::new(&blocks_path))?);
275                    let cs = Arc::new(RocksDbChainStateStore::open(Path::new(&chainstate_path))?);
276                    (bs as Arc<dyn BlockStore>, cs as Arc<dyn ChainStateStore>)
277                }
278                #[cfg(not(feature = "rocksdb-storage"))]
279                "rocksdb" => {
280                    return Err("RocksDB storage requires the 'rocksdb-storage' feature. \
281                         Build with: cargo build --features rocksdb-storage"
282                        .into());
283                }
284                _ => {
285                    tracing::info!("Using in-memory storage backend");
286                    let bs = Arc::new(InMemoryBlockStore::new());
287                    let cs = Arc::new(InMemoryChainStateStore::new());
288                    (bs as Arc<dyn BlockStore>, cs as Arc<dyn ChainStateStore>)
289                }
290            };
291
292        let rpc_server = Arc::new(JsonRpcServer::new(args.rpc_port));
293        let mempool_adapter = Arc::new(InMemoryMempool::with_max_bytes(
294            args.max_mempool_mb * 1_000_000,
295        ));
296
297        // Create peer manager: real TCP or stub depending on flag
298        let (peer_manager, tcp_peer_manager): (Arc<dyn PeerManager>, Option<Arc<TcpPeerManager>>) =
299            if args.enable_p2p {
300                let local_addr: std::net::SocketAddr =
301                    format!("0.0.0.0:{}", args.p2p_port).parse()?;
302                let tcp_mgr = Arc::new(TcpPeerManager::new(local_addr));
303                tracing::info!("P2P networking enabled on port {}", args.p2p_port);
304                (tcp_mgr.clone() as Arc<dyn PeerManager>, Some(tcp_mgr))
305            } else {
306                let stub = Arc::new(StubPeerManager::new());
307                tracing::info!("P2P networking disabled (stub mode)");
308                (stub as Arc<dyn PeerManager>, None)
309            };
310
311        // Initialize with genesis block.
312        // We use store_block + write_chain_tip which ARE on the traits,
313        // rather than init_with_genesis which is only on concrete types.
314        let genesis = chain_params.genesis_block();
315        let genesis_hash = genesis.block_hash();
316
317        // Store genesis block at height 0
318        if !block_store.has_block(&genesis_hash).await.unwrap_or(false) {
319            block_store
320                .store_block(&genesis, 0)
321                .await
322                .map_err(|e| format!("Failed to store genesis block: {}", e))?;
323        }
324
325        // Set chain tip to genesis
326        chain_state
327            .write_chain_tip(genesis_hash, 0)
328            .await
329            .map_err(|e| format!("Failed to set genesis chain tip: {}", e))?;
330
331        tracing::info!(
332            "Initialized blockchain with genesis block: {}",
333            genesis_hash
334        );
335
336        // Initialize block index with genesis header and checkpoints
337        let mut block_index = BlockIndex::new();
338        block_index.init_genesis(genesis.header.clone());
339        block_index.load_checkpoints(&chain_params.checkpoints);
340        let block_index = Arc::new(RwLock::new(block_index));
341
342        tracing::info!("Block index initialized with genesis header");
343
344        // Create sync manager
345        let sync_manager = Arc::new(RwLock::new(SyncManager::new(block_index.clone())));
346
347        // Create application services
348        let blockchain = Arc::new(BlockchainService::with_params(
349            block_store.clone(),
350            chain_state.clone(),
351            peer_manager.clone(),
352            chain_params.consensus.clone(),
353        ));
354
355        // Wire the real mempool adapter
356        let mempool = Arc::new(MempoolService::new(
357            mempool_adapter.clone() as Arc<dyn MempoolPort>,
358            chain_state.clone(),
359        ));
360
361        let template_provider = Arc::new(SimpleMiner::new());
362        let mining = Arc::new(MiningService::with_params(
363            template_provider,
364            blockchain.clone(),
365            chain_params.consensus.clone(),
366        ));
367
368        // Create fee estimator
369        let fee_estimator = Arc::new(RwLock::new(FeeEstimator::new()));
370
371        // Create rebroadcast manager
372        let rebroadcast_manager = Arc::new(RwLock::new(RebroadcastManager::new()));
373
374        // Register RPC handlers
375        let bc_handler = Box::new(BlockchainRpcHandler::new(
376            blockchain.clone(),
377            mempool.clone(),
378            fee_estimator.clone(),
379            chain_state.clone(),
380            block_index.clone(),
381        ));
382        rpc_server.register_handler(bc_handler).await?;
383
384        let mining_handler = Box::new(MiningRpcHandler::new(mining.clone()));
385        rpc_server.register_handler(mining_handler).await?;
386
387        // Create wallet if enabled
388        let mainnet = matches!(network, abtc_domain::Network::Mainnet);
389        let wallet: Option<Arc<dyn WalletPort>> = if args.enable_wallet {
390            let addr_type = match args.address_type.as_str() {
391                "legacy" | "p2pkh" => AddressType::P2PKH,
392                "p2sh-segwit" | "p2sh" => AddressType::P2shP2wpkh,
393                _ => AddressType::P2WPKH, // "bech32" and default
394            };
395
396            let in_memory = Arc::new(InMemoryWallet::new(mainnet, addr_type));
397
398            let wallet: Arc<dyn WalletPort> = if let Some(ref wallet_path) = args.wallet_file {
399                // Persistent wallet: wrap InMemoryWallet with file-based store
400                let store = Arc::new(FileBasedWalletStore::new(wallet_path));
401                let persistent = PersistentWallet::new(in_memory, store)
402                    .await
403                    .map_err(|e| format!("Failed to initialize persistent wallet: {}", e))?;
404                tracing::info!(
405                    "Wallet enabled (address type: {}, persistence: {})",
406                    args.address_type,
407                    wallet_path
408                );
409                Arc::new(persistent)
410            } else {
411                // In-memory only wallet (no persistence)
412                tracing::info!(
413                    "Wallet enabled (address type: {}, in-memory only)",
414                    args.address_type
415                );
416                in_memory
417            };
418
419            // Register wallet RPC handler
420            let wallet_handler = Box::new(WalletRpcHandler::new(wallet.clone()));
421            rpc_server.register_handler(wallet_handler).await?;
422
423            Some(wallet)
424        } else {
425            tracing::info!("Wallet disabled");
426            None
427        };
428
429        tracing::info!("Agentic Bitcoin node initialized successfully");
430
431        let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
432
433        let start_time = std::time::SystemTime::now()
434            .duration_since(std::time::UNIX_EPOCH)
435            .map(|d| d.as_secs())
436            .unwrap_or(0);
437
438        Ok(BitcoinNode {
439            blockchain,
440            mempool,
441            mining,
442            rpc_server,
443            peer_manager,
444            mempool_adapter,
445            block_index,
446            sync_manager,
447            fee_estimator,
448            rebroadcast_manager,
449            block_store,
450            chain_state,
451            tcp_peer_manager,
452            seed_peers: args.seed_peers,
453            wallet,
454            shutdown_tx,
455            shutdown_rx,
456            task_tracker: Arc::new(std::sync::Mutex::new(TaskTracker::new())),
457            start_time: Arc::new(AtomicU64::new(start_time)),
458            running: Arc::new(AtomicBool::new(false)),
459        })
460    }
461
462    /// Start the Bitcoin node
463    pub async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
464        tracing::info!("Starting Agentic Bitcoin node");
465        self.running.store(true, Ordering::Release);
466
467        // Start RPC server (now actually listens for HTTP connections)
468        self.rpc_server.start().await?;
469        tracing::info!("RPC server started on port {}", self.rpc_server.get_port());
470
471        // Connect to seed peers if P2P is enabled
472        if !self.seed_peers.is_empty() {
473            for seed in &self.seed_peers {
474                match seed.parse::<std::net::SocketAddr>() {
475                    Ok(addr) => match self.peer_manager.connect_peer(addr).await {
476                        Ok(id) => tracing::info!("Connected to seed peer {} (id: {})", addr, id),
477                        Err(e) => tracing::warn!("Failed to connect to seed peer {}: {}", addr, e),
478                    },
479                    Err(e) => tracing::warn!("Invalid seed peer address '{}': {}", seed, e),
480                }
481            }
482        }
483
484        // Start background tasks
485        self.start_background_tasks().await;
486
487        Ok(())
488    }
489
490    /// Launch background processing loops — all tasks listen to the shutdown signal.
491    async fn start_background_tasks(&self) {
492        let mut tracker = self.task_tracker.lock().unwrap();
493
494        // ── Task 1: Mempool maintenance ─────────────────────────
495        {
496            let mempool_adapter = self.mempool_adapter.clone();
497            let mut shutdown = self.shutdown_rx.clone();
498            let guard = tracker.register();
499            tokio::spawn(async move {
500                let _guard = guard;
501                let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
502                loop {
503                    tokio::select! {
504                        _ = interval.tick() => {
505                            let size = mempool_adapter.size().await;
506                            if size > 0 {
507                                tracing::info!("Mempool: {} transactions", size);
508                            }
509                        }
510                        _ = shutdown.changed() => {
511                            tracing::debug!("Mempool maintenance task shutting down");
512                            break;
513                        }
514                    }
515                }
516            });
517        }
518
519        // ── Task 2: Peer connection maintenance ─────────────────
520        {
521            let peer_manager = self.peer_manager.clone();
522            let mut shutdown = self.shutdown_rx.clone();
523            let guard = tracker.register();
524            tokio::spawn(async move {
525                let _guard = guard;
526                let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
527                loop {
528                    tokio::select! {
529                        _ = interval.tick() => {
530                            match peer_manager.get_connected_peers().await {
531                                Ok(peers) => {
532                                    if !peers.is_empty() {
533                                        tracing::debug!("Connected peers: {}", peers.len());
534                                    }
535                                }
536                                Err(e) => tracing::debug!("Error checking peers: {}", e),
537                            }
538                        }
539                        _ = shutdown.changed() => {
540                            tracing::debug!("Peer maintenance task shutting down");
541                            break;
542                        }
543                    }
544                }
545            });
546        }
547
548        // ── Task 3: Sync status reporting ───────────────────────
549        {
550            let sync_manager = self.sync_manager.clone();
551            let block_index = self.block_index.clone();
552            let mut shutdown = self.shutdown_rx.clone();
553            let guard = tracker.register();
554            tokio::spawn(async move {
555                let _guard = guard;
556                let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
557                loop {
558                    tokio::select! {
559                        _ = interval.tick() => {
560                            let sm = sync_manager.read().await;
561                            let state = sm.state();
562                            let remaining = sm.blocks_remaining();
563                            drop(sm);
564
565                            let idx = block_index.read().await;
566                            let height = idx.best_height();
567                            let headers = idx.header_count();
568                            drop(idx);
569
570                            match state {
571                                SyncState::HeaderSync => {
572                                    tracing::info!(
573                                        "Sync: downloading headers ({} known, height {})",
574                                        headers, height
575                                    );
576                                }
577                                SyncState::BlockSync => {
578                                    tracing::info!(
579                                        "Sync: downloading blocks ({} remaining, index height {})",
580                                        remaining, height
581                                    );
582                                }
583                                SyncState::Synced => {
584                                    tracing::debug!(
585                                        "Sync: fully synced at height {} ({} headers)",
586                                        height, headers
587                                    );
588                                }
589                                SyncState::Idle => {}
590                            }
591                        }
592                        _ = shutdown.changed() => {
593                            tracing::debug!("Sync reporter task shutting down");
594                            break;
595                        }
596                    }
597                }
598            });
599        }
600
601        // ── Task 4: Transaction rebroadcast ─────────────────────
602        {
603            let rebroadcast_mgr = self.rebroadcast_manager.clone();
604            let rebroadcast_mempool = self.mempool_adapter.clone();
605            let rebroadcast_peers = self.peer_manager.clone();
606            let mut shutdown = self.shutdown_rx.clone();
607            let guard = tracker.register();
608            tokio::spawn(async move {
609                let _guard = guard;
610                let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5 * 60));
611                loop {
612                    tokio::select! {
613                        _ = interval.tick() => {
614                            let now = std::time::SystemTime::now()
615                                .duration_since(std::time::UNIX_EPOCH)
616                                .map(|d| d.as_secs())
617                                .unwrap_or(0);
618
619                            let mempool_ref = &rebroadcast_mempool;
620                            let mut mgr = rebroadcast_mgr.write().await;
621
622                            let tracked_txids: Vec<abtc_domain::primitives::Txid> = mgr
623                                .check_rebroadcast(now, |txid| {
624                                    let _ = txid;
625                                    true
626                                })
627                                .into_iter()
628                                .filter_map(|action| match action {
629                                    abtc_application::rebroadcast::RebroadcastAction::Reannounce(txid) => {
630                                        Some(txid)
631                                    }
632                                    abtc_application::rebroadcast::RebroadcastAction::Abandon(txid) => {
633                                        tracing::info!("Abandoning rebroadcast of tx {}", txid);
634                                        None
635                                    }
636                                })
637                                .collect();
638
639                            drop(mgr);
640
641                            for txid in tracked_txids {
642                                if let Ok(Some(_)) = mempool_ref.get_transaction(&txid).await {
643                                    let inv = abtc_ports::NetworkMessage::Inv {
644                                        items: vec![abtc_ports::InventoryItem::Tx(txid)],
645                                    };
646                                    if let Ok(peers) = rebroadcast_peers.get_connected_peers().await {
647                                        for peer in peers {
648                                            let _ = rebroadcast_peers
649                                                .send_to_peer(peer.id, inv.clone())
650                                                .await;
651                                        }
652                                    }
653                                    tracing::debug!("Rebroadcast tx {} to peers", txid);
654                                }
655                            }
656                        }
657                        _ = shutdown.changed() => {
658                            tracing::debug!("Rebroadcast task shutting down");
659                            break;
660                        }
661                    }
662                }
663            });
664        }
665
666        // ── Task 5: TCP peer keepalive ──────────────────────────
667        if let Some(ref tcp_mgr) = self.tcp_peer_manager {
668            let tcp = tcp_mgr.clone();
669            let mut shutdown = self.shutdown_rx.clone();
670            let guard = tracker.register();
671            tokio::spawn(async move {
672                let _guard = guard;
673                let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(120));
674                loop {
675                    tokio::select! {
676                        _ = interval.tick() => {
677                            let count = tcp.peer_count().await;
678                            if count > 0 {
679                                tracing::debug!("TCP peers alive: {}", count);
680                            }
681                        }
682                        _ = shutdown.changed() => {
683                            tracing::debug!("TCP keepalive task shutting down");
684                            break;
685                        }
686                    }
687                }
688            });
689        }
690
691        let total = tracker.total;
692        drop(tracker);
693        tracing::info!("Background tasks started ({} tasks)", total);
694    }
695
696    /// Process a peer event through the sync manager.
697    ///
698    /// This is the main entry point for handling P2P messages. It delegates
699    /// to the SyncManager, which returns a list of actions to execute.
700    pub async fn handle_peer_event(
701        &self,
702        event: PeerEvent,
703    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
704        let actions = {
705            let mut sm = self.sync_manager.write().await;
706            sm.on_peer_event(
707                event,
708                self.peer_manager.as_ref(),
709                self.block_store.as_ref(),
710                self.chain_state.as_ref(),
711                self.mempool_adapter.as_ref(),
712            )
713            .await?
714        };
715
716        // Execute returned actions
717        for action in actions {
718            match action {
719                SyncAction::ProcessBlock(block) => {
720                    match self.blockchain.validate_and_accept_block(&block).await {
721                        Ok(_) => {
722                            // Block accepted — feed fee estimator with confirmed tx fee rates.
723                            let info = self.chain_state.get_best_chain_tip().await;
724                            let height = info.map(|(_, h)| h).unwrap_or(0);
725
726                            let confirmed_fees: Vec<(abtc_domain::primitives::Amount, usize, u32)> =
727                                block
728                                    .transactions
729                                    .iter()
730                                    .filter_map(|tx| {
731                                        if tx.is_coinbase() {
732                                            return None;
733                                        }
734                                        // Approximate fee: we don't have input values readily,
735                                        // so use a heuristic — the mempool had the fee when we
736                                        // accepted the tx.  For now, estimate 1 sat/vB minimum
737                                        // as a placeholder; real fee calculation requires UTXO lookup.
738                                        let vsize = tx.compute_vsize() as usize;
739                                        let estimated_fee =
740                                            abtc_domain::primitives::Amount::from_sat(
741                                                vsize as i64, // ~1 sat/vB estimate
742                                            );
743                                        Some((estimated_fee, vsize, 1u32))
744                                    })
745                                    .collect();
746
747                            if !confirmed_fees.is_empty() {
748                                let mut est = self.fee_estimator.write().await;
749                                est.process_block(height, &confirmed_fees);
750                                tracing::debug!(
751                                    "Fee estimator updated: height={}, txs={}",
752                                    height,
753                                    confirmed_fees.len()
754                                );
755                            }
756                        }
757                        Err(e) => {
758                            tracing::warn!("Failed to accept block {}: {}", block.block_hash(), e);
759                        }
760                    }
761                }
762                SyncAction::ProcessTransaction(tx) => {
763                    if let Err(e) = self.blockchain.process_new_transaction(&tx).await {
764                        tracing::debug!("Failed to process tx {}: {}", tx.txid(), e);
765                    } else {
766                        // Valid transaction — add to mempool
767                        if let Err(e) = self.mempool_adapter.add_transaction(&tx).await {
768                            tracing::debug!("Failed to add tx {} to mempool: {}", tx.txid(), e);
769                        } else {
770                            // Relay to peers
771                            let _ = self.peer_manager.broadcast_transaction(&tx).await;
772                        }
773                    }
774                }
775                SyncAction::SendMessage(peer_id, msg) => {
776                    if let Err(e) = self.peer_manager.send_to_peer(peer_id, msg).await {
777                        tracing::debug!("Failed to send message to peer {}: {}", peer_id, e);
778                    }
779                }
780                SyncAction::AcceptedTransaction { tx, from_peer } => {
781                    // Transaction was already accepted into the mempool by the
782                    // SyncManager. Relay it to all other peers.
783                    tracing::info!(
784                        "Relaying accepted tx {} (from peer {})",
785                        tx.txid(),
786                        from_peer,
787                    );
788                    let _ = self.peer_manager.broadcast_transaction(&tx).await;
789                }
790                SyncAction::DisconnectPeer(peer_id) => {
791                    let _ = self.peer_manager.disconnect_peer(peer_id).await;
792                }
793            }
794        }
795
796        Ok(())
797    }
798
799    /// Stop the Bitcoin node gracefully.
800    ///
801    /// Sends the shutdown signal to all background tasks, stops the RPC
802    /// server, disconnects all peers, and logs the final state.
803    pub async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
804        tracing::info!("Stopping Agentic Bitcoin node");
805        self.running.store(false, Ordering::Release);
806
807        // 1. Signal all background tasks to stop
808        let _ = self.shutdown_tx.send(true);
809        tracing::info!("Shutdown signal sent to background tasks");
810
811        // 2. Wait briefly for tasks to exit (non-blocking check)
812        tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
813        let active = self.task_tracker.lock().unwrap().active_count();
814        if active > 0 {
815            tracing::info!("Waiting for {} background tasks to finish...", active);
816            // Give tasks a moment to wind down
817            tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
818        }
819        let remaining = self.task_tracker.lock().unwrap().active_count();
820        if remaining > 0 {
821            tracing::warn!("{} background tasks still running at shutdown", remaining);
822        } else {
823            tracing::info!("All background tasks stopped cleanly");
824        }
825
826        // 3. Stop RPC server
827        self.rpc_server.stop().await?;
828        tracing::info!("RPC server stopped");
829
830        // 4. Disconnect all peers
831        if let Ok(peers) = self.peer_manager.get_connected_peers().await {
832            let count = peers.len();
833            for peer in peers {
834                let _ = self.peer_manager.disconnect_peer(peer.id).await;
835            }
836            tracing::info!("Disconnected {} peers", count);
837        }
838
839        // 5. Log final sync state
840        let sm = self.sync_manager.read().await;
841        let idx = self.block_index.read().await;
842        let uptime = self.uptime_secs();
843        tracing::info!(
844            "Final state: sync={:?}, height={}, headers={}, uptime={}s",
845            sm.state(),
846            idx.best_height(),
847            idx.header_count(),
848            uptime,
849        );
850
851        tracing::info!("Agentic Bitcoin node stopped");
852        Ok(())
853    }
854
855    /// Get current node health status.
856    pub async fn health(&self) -> NodeHealth {
857        let (active_tasks, total_tasks) = {
858            let tracker = self.task_tracker.lock().unwrap();
859            let active_tasks = tracker.active_count() as u32;
860            let total_tasks = tracker.total;
861            (active_tasks, total_tasks)
862        };
863
864        let sm = self.sync_manager.read().await;
865        let sync_state = format!("{:?}", sm.state());
866        drop(sm);
867
868        let idx = self.block_index.read().await;
869        let block_height = idx.best_height();
870        drop(idx);
871
872        let mempool_size = self.mempool_adapter.size().await as u32;
873
874        let peer_count = self
875            .peer_manager
876            .get_connected_peers()
877            .await
878            .map(|p| p.len() as u32)
879            .unwrap_or(0);
880
881        let rpc_running = self.rpc_server.is_running();
882
883        NodeHealth {
884            is_running: self.running.load(Ordering::Acquire),
885            active_tasks,
886            total_tasks,
887            uptime_secs: self.uptime_secs(),
888            sync_state,
889            block_height,
890            mempool_size,
891            peer_count,
892            rpc_running,
893        }
894    }
895
896    /// Calculate node uptime in seconds.
897    fn uptime_secs(&self) -> u64 {
898        let start = self.start_time.load(Ordering::Relaxed);
899        let now = std::time::SystemTime::now()
900            .duration_since(std::time::UNIX_EPOCH)
901            .map(|d| d.as_secs())
902            .unwrap_or(0);
903        now.saturating_sub(start)
904    }
905
906    /// Get chain info
907    pub async fn get_chain_info(&self) -> Result<abtc_application::services::ChainInfo, String> {
908        self.blockchain.get_chain_info().await
909    }
910}
911
912/// Wait for a termination signal (SIGINT or SIGTERM).
913///
914/// On Unix, listens for both Ctrl+C (SIGINT) and SIGTERM (used by
915/// containers, systemd, process managers). On other platforms, only
916/// Ctrl+C is supported.
917async fn wait_for_shutdown_signal() {
918    #[cfg(unix)]
919    {
920        use tokio::signal::unix::{signal, SignalKind};
921        let mut sigterm =
922            signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
923        tokio::select! {
924            _ = tokio::signal::ctrl_c() => {
925                tracing::info!("Received SIGINT (Ctrl+C)");
926            }
927            _ = sigterm.recv() => {
928                tracing::info!("Received SIGTERM");
929            }
930        }
931    }
932    #[cfg(not(unix))]
933    {
934        tokio::signal::ctrl_c()
935            .await
936            .expect("failed to listen for Ctrl+C");
937        tracing::info!("Received Ctrl+C");
938    }
939}
940
941/// Entry point for the Bitcoin infrastructure layer.
942///
943/// Creates the node, starts all services, waits for a termination signal,
944/// then performs an orderly shutdown with a timeout.
945pub async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
946    let args = CliArgs::parse();
947
948    let node = BitcoinNode::new(args).await?;
949    node.start().await?;
950
951    // Wait for SIGINT or SIGTERM
952    wait_for_shutdown_signal().await;
953
954    // Graceful shutdown with timeout
955    tracing::info!(
956        "Initiating graceful shutdown (timeout: {}s)...",
957        SHUTDOWN_TIMEOUT_SECS
958    );
959
960    let shutdown_result = tokio::time::timeout(
961        tokio::time::Duration::from_secs(SHUTDOWN_TIMEOUT_SECS),
962        node.stop(),
963    )
964    .await;
965
966    match shutdown_result {
967        Ok(Ok(())) => {
968            tracing::info!("Clean shutdown completed");
969        }
970        Ok(Err(e)) => {
971            tracing::error!("Shutdown encountered error: {}", e);
972        }
973        Err(_) => {
974            tracing::error!(
975                "Shutdown timed out after {}s — forcing exit",
976                SHUTDOWN_TIMEOUT_SECS
977            );
978        }
979    }
980
981    Ok(())
982}
983
984#[cfg(test)]
985mod tests {
986    use super::*;
987
988    #[tokio::test]
989    async fn test_node_creation() {
990        let args = CliArgs {
991            network: "regtest".to_string(),
992            datadir: "/tmp/bitcoin-test".to_string(),
993            rpc_port: 18332,
994            p2p_port: 18333,
995            log_level: "debug".to_string(),
996            max_mempool_mb: 300,
997            enable_p2p: false,
998            seed_peers: Vec::new(),
999            storage_backend: "memory".to_string(),
1000            enable_wallet: true,
1001            address_type: "bech32".to_string(),
1002            wallet_file: None,
1003            signet_challenge: None,
1004        };
1005
1006        let node = BitcoinNode::new(args).await;
1007        assert!(node.is_ok());
1008    }
1009
1010    #[tokio::test]
1011    async fn test_node_chain_info() {
1012        let args = CliArgs {
1013            network: "regtest".to_string(),
1014            datadir: "/tmp/bitcoin-test".to_string(),
1015            rpc_port: 18332,
1016            p2p_port: 18333,
1017            log_level: "warn".to_string(),
1018            max_mempool_mb: 300,
1019            enable_p2p: false,
1020            seed_peers: Vec::new(),
1021            storage_backend: "memory".to_string(),
1022            enable_wallet: true,
1023            address_type: "bech32".to_string(),
1024            wallet_file: None,
1025            signet_challenge: None,
1026        };
1027
1028        let node = BitcoinNode::new(args).await.unwrap();
1029        let info = node.get_chain_info().await.unwrap();
1030        assert_eq!(info.height, 0);
1031        assert_eq!(info.blocks, 1);
1032    }
1033
1034    #[tokio::test]
1035    async fn test_node_has_block_index() {
1036        let args = CliArgs {
1037            network: "regtest".to_string(),
1038            datadir: "/tmp/bitcoin-test".to_string(),
1039            rpc_port: 18332,
1040            p2p_port: 18333,
1041            log_level: "warn".to_string(),
1042            max_mempool_mb: 300,
1043            enable_p2p: false,
1044            seed_peers: Vec::new(),
1045            storage_backend: "memory".to_string(),
1046            enable_wallet: true,
1047            address_type: "bech32".to_string(),
1048            wallet_file: None,
1049            signet_challenge: None,
1050        };
1051
1052        let node = BitcoinNode::new(args).await.unwrap();
1053
1054        // Block index should be initialized with genesis
1055        let idx = node.block_index.read().await;
1056        assert_eq!(idx.best_height(), 0);
1057        assert_eq!(idx.header_count(), 1);
1058    }
1059
1060    #[tokio::test]
1061    async fn test_node_has_sync_manager() {
1062        let args = CliArgs {
1063            network: "regtest".to_string(),
1064            datadir: "/tmp/bitcoin-test".to_string(),
1065            rpc_port: 18332,
1066            p2p_port: 18333,
1067            log_level: "warn".to_string(),
1068            max_mempool_mb: 300,
1069            enable_p2p: false,
1070            seed_peers: Vec::new(),
1071            storage_backend: "memory".to_string(),
1072            enable_wallet: true,
1073            address_type: "bech32".to_string(),
1074            wallet_file: None,
1075            signet_challenge: None,
1076        };
1077
1078        let node = BitcoinNode::new(args).await.unwrap();
1079
1080        // Sync manager should start in Idle state
1081        let sm = node.sync_manager.read().await;
1082        assert_eq!(sm.state(), SyncState::Idle);
1083        assert_eq!(sm.blocks_remaining(), 0);
1084    }
1085
1086    fn make_test_args() -> CliArgs {
1087        make_test_args_with_port(18332)
1088    }
1089
1090    /// Create test args with a specific RPC port — use unique ports for tests
1091    /// that call `node.start()` to avoid AddrInUse when tests run in parallel.
1092    fn make_test_args_with_port(rpc_port: u16) -> CliArgs {
1093        CliArgs {
1094            network: "regtest".to_string(),
1095            datadir: "/tmp/bitcoin-test".to_string(),
1096            rpc_port,
1097            p2p_port: 18333,
1098            log_level: "warn".to_string(),
1099            max_mempool_mb: 300,
1100            enable_p2p: false,
1101            seed_peers: Vec::new(),
1102            storage_backend: "memory".to_string(),
1103            enable_wallet: true,
1104            address_type: "bech32".to_string(),
1105            wallet_file: None,
1106            signet_challenge: None,
1107        }
1108    }
1109
1110    /// Allocate a unique port for a test that needs to bind the RPC server.
1111    fn unique_port() -> u16 {
1112        use std::sync::atomic::{AtomicU16, Ordering};
1113        static PORT: AtomicU16 = AtomicU16::new(19400);
1114        PORT.fetch_add(1, Ordering::Relaxed)
1115    }
1116
1117    #[tokio::test]
1118    async fn test_node_has_fee_estimator() {
1119        let node = BitcoinNode::new(make_test_args()).await.unwrap();
1120
1121        // Fee estimator should exist and return min relay fee initially
1122        let fe = node.fee_estimator.read().await;
1123        let estimate = fe.estimate_fee(6);
1124        // Min relay fee is 1.0 sat/vB = 1000 sat/kvB
1125        assert!(estimate > 0.0);
1126    }
1127
1128    #[tokio::test]
1129    async fn test_node_has_rebroadcast_manager() {
1130        let node = BitcoinNode::new(make_test_args()).await.unwrap();
1131
1132        let rb = node.rebroadcast_manager.read().await;
1133        assert_eq!(
1134            rb.tracked_count(),
1135            0,
1136            "No transactions should be tracked initially"
1137        );
1138    }
1139
1140    #[tokio::test]
1141    async fn test_node_has_wallet_when_enabled() {
1142        let node = BitcoinNode::new(make_test_args()).await.unwrap();
1143        assert!(
1144            node.wallet.is_some(),
1145            "Wallet should be present when enable_wallet is true"
1146        );
1147    }
1148
1149    #[tokio::test]
1150    async fn test_node_no_wallet_when_disabled() {
1151        let mut args = make_test_args();
1152        args.enable_wallet = false;
1153
1154        let node = BitcoinNode::new(args).await.unwrap();
1155        assert!(
1156            node.wallet.is_none(),
1157            "Wallet should be absent when enable_wallet is false"
1158        );
1159    }
1160
1161    #[tokio::test]
1162    async fn test_node_mempool_starts_empty() {
1163        let node = BitcoinNode::new(make_test_args()).await.unwrap();
1164
1165        let info = node.mempool_adapter.get_mempool_info().await.unwrap();
1166        assert_eq!(info.size, 0);
1167        assert_eq!(info.bytes, 0);
1168    }
1169
1170    #[tokio::test]
1171    async fn test_node_testnet_creation() {
1172        let mut args = make_test_args();
1173        args.network = "testnet".to_string();
1174
1175        let node = BitcoinNode::new(args).await.unwrap();
1176        let info = node.get_chain_info().await.unwrap();
1177        assert_eq!(info.height, 0);
1178        assert_eq!(info.blocks, 1);
1179    }
1180
1181    #[tokio::test]
1182    async fn test_node_with_wallet_persistence() {
1183        let wallet_path = {
1184            let mut p = std::env::temp_dir();
1185            let id = std::time::SystemTime::now()
1186                .duration_since(std::time::UNIX_EPOCH)
1187                .unwrap()
1188                .as_nanos() as u64;
1189            p.push(format!("test_infra_wallet_{}.json", id));
1190            p.to_str().unwrap().to_string()
1191        };
1192
1193        // Phase 1: Create node with persistent wallet, generate an address
1194        {
1195            let mut args = make_test_args();
1196            args.wallet_file = Some(wallet_path.clone());
1197
1198            let node = BitcoinNode::new(args).await.unwrap();
1199            assert!(node.wallet.is_some());
1200
1201            // Generate an address
1202            let wallet = node.wallet.as_ref().unwrap();
1203            let addr = wallet.get_new_address(Some("persist-test")).await.unwrap();
1204            assert!(addr.starts_with("tb1q") || addr.starts_with("bcrt1q"));
1205        }
1206
1207        // Wallet file should exist
1208        assert!(std::path::Path::new(&wallet_path).exists());
1209
1210        // Phase 2: Create new node with same wallet file, verify address persists
1211        {
1212            let mut args = make_test_args();
1213            args.wallet_file = Some(wallet_path.clone());
1214
1215            let node = BitcoinNode::new(args).await.unwrap();
1216            let wallet = node.wallet.as_ref().unwrap();
1217
1218            // Should be able to list unspent (wallet is loaded, has 1 key)
1219            let unspent = wallet.list_unspent(0, None).await.unwrap();
1220            // No UTXOs, but the wallet loaded without error — that confirms persistence
1221            assert_eq!(unspent.len(), 0);
1222        }
1223
1224        // Cleanup
1225        let _ = tokio::fs::remove_file(&wallet_path).await;
1226    }
1227
1228    #[tokio::test]
1229    async fn test_node_wallet_file_none_is_in_memory() {
1230        let args = make_test_args(); // wallet_file is None
1231
1232        let node = BitcoinNode::new(args).await.unwrap();
1233        assert!(node.wallet.is_some());
1234
1235        // Generate an address — should work fine in-memory
1236        let wallet = node.wallet.as_ref().unwrap();
1237        let addr = wallet.get_new_address(None).await.unwrap();
1238        assert!(!addr.is_empty());
1239    }
1240
1241    #[tokio::test]
1242    async fn test_node_signet_creation() {
1243        let mut args = make_test_args();
1244        args.network = "signet".to_string();
1245
1246        let node = BitcoinNode::new(args).await.unwrap();
1247        let info = node.get_chain_info().await.unwrap();
1248        assert_eq!(info.height, 0);
1249        assert_eq!(info.blocks, 1);
1250    }
1251
1252    #[tokio::test]
1253    async fn test_node_custom_signet_challenge() {
1254        let mut args = make_test_args();
1255        args.network = "signet".to_string();
1256        // OP_TRUE as a trivial custom challenge
1257        args.signet_challenge = Some("51".to_string());
1258
1259        let node = BitcoinNode::new(args).await.unwrap();
1260        let info = node.get_chain_info().await.unwrap();
1261        assert_eq!(info.height, 0);
1262    }
1263
1264    #[tokio::test]
1265    async fn test_node_invalid_signet_challenge_hex() {
1266        let mut args = make_test_args();
1267        args.network = "signet".to_string();
1268        args.signet_challenge = Some("not_valid_hex".to_string());
1269
1270        let result = BitcoinNode::new(args).await;
1271        assert!(result.is_err(), "Invalid hex should produce an error");
1272    }
1273
1274    // ── Infrastructure hardening tests ──────────────────────────
1275
1276    #[test]
1277    fn test_task_tracker_register_and_count() {
1278        let mut tracker = TaskTracker::new();
1279        assert_eq!(tracker.active_count(), 0);
1280        assert_eq!(tracker.total, 0);
1281
1282        let g1 = tracker.register();
1283        assert_eq!(tracker.active_count(), 1);
1284        assert_eq!(tracker.total, 1);
1285
1286        let g2 = tracker.register();
1287        assert_eq!(tracker.active_count(), 2);
1288        assert_eq!(tracker.total, 2);
1289
1290        drop(g1);
1291        assert_eq!(tracker.active_count(), 1);
1292
1293        drop(g2);
1294        assert_eq!(tracker.active_count(), 0);
1295        // total stays at 2 — it's a high-water mark
1296        assert_eq!(tracker.total, 2);
1297    }
1298
1299    #[test]
1300    fn test_task_guard_decrements_on_drop() {
1301        let mut tracker = TaskTracker::new();
1302        {
1303            let _g = tracker.register();
1304            assert_eq!(tracker.active_count(), 1);
1305        }
1306        // guard dropped at end of scope
1307        assert_eq!(tracker.active_count(), 0);
1308    }
1309
1310    #[tokio::test]
1311    async fn test_node_starts_not_running() {
1312        let node = BitcoinNode::new(make_test_args()).await.unwrap();
1313        // Before start(), running should be false
1314        assert!(!node.running.load(Ordering::Acquire));
1315    }
1316
1317    #[tokio::test]
1318    async fn test_node_start_sets_running() {
1319        let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1320            .await
1321            .unwrap();
1322        node.start().await.unwrap();
1323        assert!(node.running.load(Ordering::Acquire));
1324        // Clean up
1325        let _ = node.stop().await;
1326    }
1327
1328    #[tokio::test]
1329    async fn test_node_stop_clears_running() {
1330        let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1331            .await
1332            .unwrap();
1333        node.start().await.unwrap();
1334        assert!(node.running.load(Ordering::Acquire));
1335
1336        node.stop().await.unwrap();
1337        assert!(!node.running.load(Ordering::Acquire));
1338    }
1339
1340    #[tokio::test]
1341    async fn test_shutdown_signal_stops_tasks() {
1342        let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1343            .await
1344            .unwrap();
1345        node.start().await.unwrap();
1346
1347        // Background tasks should be running
1348        let active_before = node.task_tracker.lock().unwrap().active_count();
1349        assert!(active_before > 0, "Expected background tasks to be running");
1350
1351        // Send shutdown signal
1352        let _ = node.shutdown_tx.send(true);
1353        // Give tasks a moment to exit
1354        tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1355
1356        let active_after = node.task_tracker.lock().unwrap().active_count();
1357        assert_eq!(
1358            active_after, 0,
1359            "All tasks should have exited after shutdown signal"
1360        );
1361
1362        let _ = node.stop().await;
1363    }
1364
1365    #[tokio::test]
1366    async fn test_health_before_start() {
1367        let node = BitcoinNode::new(make_test_args()).await.unwrap();
1368        let health = node.health().await;
1369
1370        assert!(!health.is_running);
1371        assert_eq!(health.active_tasks, 0);
1372        assert_eq!(health.total_tasks, 0);
1373        assert_eq!(health.block_height, 0);
1374        assert_eq!(health.mempool_size, 0);
1375        assert_eq!(health.peer_count, 0);
1376        assert_eq!(health.sync_state, "Idle");
1377    }
1378
1379    #[tokio::test]
1380    async fn test_health_after_start() {
1381        let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1382            .await
1383            .unwrap();
1384        node.start().await.unwrap();
1385
1386        let health = node.health().await;
1387        assert!(health.is_running);
1388        assert!(health.active_tasks > 0, "Background tasks should be active");
1389        assert!(health.total_tasks > 0, "Total tasks should be non-zero");
1390        assert!(health.rpc_running);
1391
1392        let _ = node.stop().await;
1393    }
1394
1395    #[tokio::test]
1396    async fn test_health_after_stop() {
1397        let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1398            .await
1399            .unwrap();
1400        node.start().await.unwrap();
1401        node.stop().await.unwrap();
1402
1403        let health = node.health().await;
1404        assert!(!health.is_running);
1405        assert_eq!(health.active_tasks, 0, "No tasks should remain after stop");
1406    }
1407
1408    #[tokio::test]
1409    async fn test_uptime_increases() {
1410        let node = BitcoinNode::new(make_test_args()).await.unwrap();
1411        let t1 = node.uptime_secs();
1412        tokio::time::sleep(tokio::time::Duration::from_millis(1100)).await;
1413        let t2 = node.uptime_secs();
1414        assert!(t2 >= t1 + 1, "Uptime should increase over time");
1415    }
1416
1417    #[tokio::test]
1418    async fn test_node_health_fields_consistent() {
1419        let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1420            .await
1421            .unwrap();
1422        node.start().await.unwrap();
1423
1424        let health = node.health().await;
1425
1426        // active_tasks should never exceed total_tasks
1427        assert!(health.active_tasks <= health.total_tasks);
1428        // uptime should be small (just started)
1429        assert!(health.uptime_secs < 60);
1430
1431        let _ = node.stop().await;
1432    }
1433
1434    #[tokio::test]
1435    async fn test_double_stop_is_safe() {
1436        let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1437            .await
1438            .unwrap();
1439        node.start().await.unwrap();
1440
1441        // First stop
1442        node.stop().await.unwrap();
1443        // Second stop should not panic or error
1444        node.stop().await.unwrap();
1445    }
1446
1447    #[test]
1448    fn test_node_health_clone() {
1449        let health = NodeHealth {
1450            is_running: true,
1451            active_tasks: 5,
1452            total_tasks: 5,
1453            uptime_secs: 100,
1454            sync_state: "Synced".to_string(),
1455            block_height: 800_000,
1456            mempool_size: 42,
1457            peer_count: 8,
1458            rpc_running: true,
1459        };
1460
1461        let cloned = health.clone();
1462        assert_eq!(cloned.is_running, true);
1463        assert_eq!(cloned.active_tasks, 5);
1464        assert_eq!(cloned.block_height, 800_000);
1465        assert_eq!(cloned.peer_count, 8);
1466    }
1467
1468    #[test]
1469    fn test_node_health_debug() {
1470        let health = NodeHealth {
1471            is_running: false,
1472            active_tasks: 0,
1473            total_tasks: 5,
1474            uptime_secs: 0,
1475            sync_state: "Idle".to_string(),
1476            block_height: 0,
1477            mempool_size: 0,
1478            peer_count: 0,
1479            rpc_running: false,
1480        };
1481
1482        let debug_str = format!("{:?}", health);
1483        assert!(debug_str.contains("NodeHealth"));
1484        assert!(debug_str.contains("is_running: false"));
1485    }
1486}