1use abtc_adapters::{
14 FileBasedWalletStore, InMemoryBlockStore, InMemoryChainStateStore, InMemoryMempool,
15 InMemoryWallet, JsonRpcServer, PersistentWallet, SimpleMiner, StubPeerManager, TcpPeerManager,
16};
17use abtc_application::block_index::BlockIndex;
18use abtc_application::fee_estimator::FeeEstimator;
19use abtc_application::handlers::{BlockchainRpcHandler, MiningRpcHandler, WalletRpcHandler};
20use abtc_application::net_processing::{SyncAction, SyncManager, SyncState};
21use abtc_application::rebroadcast::RebroadcastManager;
22use abtc_application::services::{BlockchainService, MempoolService, MiningService};
23use abtc_domain::wallet::address::AddressType;
24use abtc_domain::ChainParams;
25use abtc_ports::{
26 BlockStore, ChainStateStore, MempoolPort, PeerEvent, PeerManager, RpcServer, WalletPort,
27};
28use clap::Parser;
29use std::sync::atomic::{AtomicBool, AtomicU64, Ordering};
30use std::sync::Arc;
31use tokio::sync::RwLock;
32use tracing_subscriber::EnvFilter;
33
34const SHUTDOWN_TIMEOUT_SECS: u64 = 10;
37
38#[derive(Debug, Clone)]
40pub struct NodeHealth {
41 pub is_running: bool,
43 pub active_tasks: u32,
45 pub total_tasks: u32,
47 pub uptime_secs: u64,
49 pub sync_state: String,
51 pub block_height: u32,
53 pub mempool_size: u32,
55 pub peer_count: u32,
57 pub rpc_running: bool,
59}
60
61struct TaskTracker {
63 active: Arc<AtomicU64>,
65 total: u32,
67}
68
69impl TaskTracker {
70 fn new() -> Self {
71 TaskTracker {
72 active: Arc::new(AtomicU64::new(0)),
73 total: 0,
74 }
75 }
76
77 fn register(&mut self) -> TaskGuard {
79 self.total += 1;
80 self.active.fetch_add(1, Ordering::Relaxed);
81 TaskGuard {
82 active: self.active.clone(),
83 }
84 }
85
86 fn active_count(&self) -> u64 {
87 self.active.load(Ordering::Relaxed)
88 }
89}
90
91struct TaskGuard {
93 active: Arc<AtomicU64>,
94}
95
96impl Drop for TaskGuard {
97 fn drop(&mut self) {
98 self.active.fetch_sub(1, Ordering::Relaxed);
99 }
100}
101
102#[derive(Parser, Debug)]
104#[command(
105 name = "Agentic Bitcoin",
106 about = "A Bitcoin Core reimplementation in Rust with hexagonal architecture",
107 version
108)]
109pub struct CliArgs {
110 #[arg(long, default_value = "mainnet")]
112 pub network: String,
113
114 #[arg(long, default_value = "~/.bitcoin")]
116 pub datadir: String,
117
118 #[arg(long, default_value = "8332")]
120 pub rpc_port: u16,
121
122 #[arg(long, default_value = "8333")]
124 pub p2p_port: u16,
125
126 #[arg(long, default_value = "info")]
128 pub log_level: String,
129
130 #[arg(long, default_value = "300")]
132 pub max_mempool_mb: u64,
133
134 #[arg(long, default_value = "false")]
136 pub enable_p2p: bool,
137
138 #[arg(long, value_delimiter = ',')]
140 pub seed_peers: Vec<String>,
141
142 #[arg(long, default_value = "memory")]
144 pub storage_backend: String,
145
146 #[arg(long, default_value = "true")]
148 pub enable_wallet: bool,
149
150 #[arg(long, default_value = "bech32")]
152 pub address_type: String,
153
154 #[arg(long)]
158 pub wallet_file: Option<String>,
159
160 #[arg(long)]
164 pub signet_challenge: Option<String>,
165}
166
167pub struct BitcoinNode {
169 pub blockchain: Arc<BlockchainService>,
170 pub mempool: Arc<MempoolService>,
171 pub mining: Arc<MiningService>,
172 pub rpc_server: Arc<JsonRpcServer>,
173 pub peer_manager: Arc<dyn PeerManager>,
174 pub mempool_adapter: Arc<InMemoryMempool>,
175 pub block_index: Arc<RwLock<BlockIndex>>,
176 pub sync_manager: Arc<RwLock<SyncManager>>,
177 pub fee_estimator: Arc<RwLock<FeeEstimator>>,
179 pub rebroadcast_manager: Arc<RwLock<RebroadcastManager>>,
181 block_store: Arc<dyn BlockStore>,
183 chain_state: Arc<dyn ChainStateStore>,
185 tcp_peer_manager: Option<Arc<TcpPeerManager>>,
187 seed_peers: Vec<String>,
189 pub wallet: Option<Arc<dyn WalletPort>>,
191 shutdown_tx: tokio::sync::watch::Sender<bool>,
193 shutdown_rx: tokio::sync::watch::Receiver<bool>,
195 task_tracker: Arc<std::sync::Mutex<TaskTracker>>,
197 start_time: Arc<AtomicU64>,
199 running: Arc<AtomicBool>,
201}
202
203impl BitcoinNode {
204 pub async fn new(args: CliArgs) -> Result<Self, Box<dyn std::error::Error + Send + Sync>> {
206 let _ = tracing_subscriber::fmt()
210 .with_env_filter(
211 EnvFilter::try_from_default_env()
212 .unwrap_or_else(|_| EnvFilter::new(&args.log_level)),
213 )
214 .try_init();
215
216 tracing::info!("Initializing Agentic Bitcoin node on {}", args.network);
217
218 let network = match args.network.as_str() {
220 "mainnet" => abtc_domain::Network::Mainnet,
221 "testnet" => abtc_domain::Network::Testnet,
222 "regtest" => abtc_domain::Network::Regtest,
223 "signet" => abtc_domain::Network::Signet,
224 _ => {
225 return Err(format!("Unknown network: {}", args.network).into());
226 }
227 };
228
229 let mut chain_params = ChainParams::for_network(network);
230
231 if let Some(ref challenge_hex) = args.signet_challenge {
233 match hex::decode(challenge_hex) {
234 Ok(challenge_bytes) => {
235 chain_params.consensus.signet_challenge = Some(challenge_bytes);
236 tracing::info!("Using custom signet challenge: {}", challenge_hex);
237 }
238 Err(e) => {
239 return Err(format!(
240 "Invalid --signet-challenge hex: {}: {}",
241 challenge_hex, e
242 )
243 .into());
244 }
245 }
246 }
247
248 tracing::info!("Using chain parameters for {}", network);
249
250 let (block_store, chain_state): (Arc<dyn BlockStore>, Arc<dyn ChainStateStore>) =
252 match args.storage_backend.as_str() {
253 #[cfg(feature = "rocksdb-storage")]
254 "rocksdb" => {
255 use abtc_adapters::{RocksDbBlockStore, RocksDbChainStateStore};
256 use std::path::Path;
257
258 let datadir = args
259 .datadir
260 .replace('~', &std::env::var("HOME").unwrap_or_default());
261 let blocks_path = format!("{}/blocks", datadir);
262 let chainstate_path = format!("{}/chainstate", datadir);
263
264 std::fs::create_dir_all(&blocks_path)?;
266 std::fs::create_dir_all(&chainstate_path)?;
267
268 tracing::info!(
269 "Using RocksDB storage (blocks: {}, chainstate: {})",
270 blocks_path,
271 chainstate_path
272 );
273
274 let bs = Arc::new(RocksDbBlockStore::open(Path::new(&blocks_path))?);
275 let cs = Arc::new(RocksDbChainStateStore::open(Path::new(&chainstate_path))?);
276 (bs as Arc<dyn BlockStore>, cs as Arc<dyn ChainStateStore>)
277 }
278 #[cfg(not(feature = "rocksdb-storage"))]
279 "rocksdb" => {
280 return Err("RocksDB storage requires the 'rocksdb-storage' feature. \
281 Build with: cargo build --features rocksdb-storage"
282 .into());
283 }
284 _ => {
285 tracing::info!("Using in-memory storage backend");
286 let bs = Arc::new(InMemoryBlockStore::new());
287 let cs = Arc::new(InMemoryChainStateStore::new());
288 (bs as Arc<dyn BlockStore>, cs as Arc<dyn ChainStateStore>)
289 }
290 };
291
292 let rpc_server = Arc::new(JsonRpcServer::new(args.rpc_port));
293 let mempool_adapter = Arc::new(InMemoryMempool::with_max_bytes(
294 args.max_mempool_mb * 1_000_000,
295 ));
296
297 let (peer_manager, tcp_peer_manager): (Arc<dyn PeerManager>, Option<Arc<TcpPeerManager>>) =
299 if args.enable_p2p {
300 let local_addr: std::net::SocketAddr =
301 format!("0.0.0.0:{}", args.p2p_port).parse()?;
302 let tcp_mgr = Arc::new(TcpPeerManager::new(local_addr));
303 tracing::info!("P2P networking enabled on port {}", args.p2p_port);
304 (tcp_mgr.clone() as Arc<dyn PeerManager>, Some(tcp_mgr))
305 } else {
306 let stub = Arc::new(StubPeerManager::new());
307 tracing::info!("P2P networking disabled (stub mode)");
308 (stub as Arc<dyn PeerManager>, None)
309 };
310
311 let genesis = chain_params.genesis_block();
315 let genesis_hash = genesis.block_hash();
316
317 if !block_store.has_block(&genesis_hash).await.unwrap_or(false) {
319 block_store
320 .store_block(&genesis, 0)
321 .await
322 .map_err(|e| format!("Failed to store genesis block: {}", e))?;
323 }
324
325 chain_state
327 .write_chain_tip(genesis_hash, 0)
328 .await
329 .map_err(|e| format!("Failed to set genesis chain tip: {}", e))?;
330
331 tracing::info!(
332 "Initialized blockchain with genesis block: {}",
333 genesis_hash
334 );
335
336 let mut block_index = BlockIndex::new();
338 block_index.init_genesis(genesis.header.clone());
339 block_index.load_checkpoints(&chain_params.checkpoints);
340 let block_index = Arc::new(RwLock::new(block_index));
341
342 tracing::info!("Block index initialized with genesis header");
343
344 let sync_manager = Arc::new(RwLock::new(SyncManager::new(block_index.clone())));
346
347 let blockchain = Arc::new(BlockchainService::with_params(
349 block_store.clone(),
350 chain_state.clone(),
351 peer_manager.clone(),
352 chain_params.consensus.clone(),
353 ));
354
355 let mempool = Arc::new(MempoolService::new(
357 mempool_adapter.clone() as Arc<dyn MempoolPort>,
358 chain_state.clone(),
359 ));
360
361 let template_provider = Arc::new(SimpleMiner::new());
362 let mining = Arc::new(MiningService::with_params(
363 template_provider,
364 blockchain.clone(),
365 chain_params.consensus.clone(),
366 ));
367
368 let fee_estimator = Arc::new(RwLock::new(FeeEstimator::new()));
370
371 let rebroadcast_manager = Arc::new(RwLock::new(RebroadcastManager::new()));
373
374 let bc_handler = Box::new(BlockchainRpcHandler::new(
376 blockchain.clone(),
377 mempool.clone(),
378 fee_estimator.clone(),
379 chain_state.clone(),
380 block_index.clone(),
381 ));
382 rpc_server.register_handler(bc_handler).await?;
383
384 let mining_handler = Box::new(MiningRpcHandler::new(mining.clone()));
385 rpc_server.register_handler(mining_handler).await?;
386
387 let mainnet = matches!(network, abtc_domain::Network::Mainnet);
389 let wallet: Option<Arc<dyn WalletPort>> = if args.enable_wallet {
390 let addr_type = match args.address_type.as_str() {
391 "legacy" | "p2pkh" => AddressType::P2PKH,
392 "p2sh-segwit" | "p2sh" => AddressType::P2shP2wpkh,
393 _ => AddressType::P2WPKH, };
395
396 let in_memory = Arc::new(InMemoryWallet::new(mainnet, addr_type));
397
398 let wallet: Arc<dyn WalletPort> = if let Some(ref wallet_path) = args.wallet_file {
399 let store = Arc::new(FileBasedWalletStore::new(wallet_path));
401 let persistent = PersistentWallet::new(in_memory, store)
402 .await
403 .map_err(|e| format!("Failed to initialize persistent wallet: {}", e))?;
404 tracing::info!(
405 "Wallet enabled (address type: {}, persistence: {})",
406 args.address_type,
407 wallet_path
408 );
409 Arc::new(persistent)
410 } else {
411 tracing::info!(
413 "Wallet enabled (address type: {}, in-memory only)",
414 args.address_type
415 );
416 in_memory
417 };
418
419 let wallet_handler = Box::new(WalletRpcHandler::new(wallet.clone()));
421 rpc_server.register_handler(wallet_handler).await?;
422
423 Some(wallet)
424 } else {
425 tracing::info!("Wallet disabled");
426 None
427 };
428
429 tracing::info!("Agentic Bitcoin node initialized successfully");
430
431 let (shutdown_tx, shutdown_rx) = tokio::sync::watch::channel(false);
432
433 let start_time = std::time::SystemTime::now()
434 .duration_since(std::time::UNIX_EPOCH)
435 .map(|d| d.as_secs())
436 .unwrap_or(0);
437
438 Ok(BitcoinNode {
439 blockchain,
440 mempool,
441 mining,
442 rpc_server,
443 peer_manager,
444 mempool_adapter,
445 block_index,
446 sync_manager,
447 fee_estimator,
448 rebroadcast_manager,
449 block_store,
450 chain_state,
451 tcp_peer_manager,
452 seed_peers: args.seed_peers,
453 wallet,
454 shutdown_tx,
455 shutdown_rx,
456 task_tracker: Arc::new(std::sync::Mutex::new(TaskTracker::new())),
457 start_time: Arc::new(AtomicU64::new(start_time)),
458 running: Arc::new(AtomicBool::new(false)),
459 })
460 }
461
462 pub async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
464 tracing::info!("Starting Agentic Bitcoin node");
465 self.running.store(true, Ordering::Release);
466
467 self.rpc_server.start().await?;
469 tracing::info!("RPC server started on port {}", self.rpc_server.get_port());
470
471 if !self.seed_peers.is_empty() {
473 for seed in &self.seed_peers {
474 match seed.parse::<std::net::SocketAddr>() {
475 Ok(addr) => match self.peer_manager.connect_peer(addr).await {
476 Ok(id) => tracing::info!("Connected to seed peer {} (id: {})", addr, id),
477 Err(e) => tracing::warn!("Failed to connect to seed peer {}: {}", addr, e),
478 },
479 Err(e) => tracing::warn!("Invalid seed peer address '{}': {}", seed, e),
480 }
481 }
482 }
483
484 self.start_background_tasks().await;
486
487 Ok(())
488 }
489
490 async fn start_background_tasks(&self) {
492 let mut tracker = self.task_tracker.lock().unwrap();
493
494 {
496 let mempool_adapter = self.mempool_adapter.clone();
497 let mut shutdown = self.shutdown_rx.clone();
498 let guard = tracker.register();
499 tokio::spawn(async move {
500 let _guard = guard;
501 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(60));
502 loop {
503 tokio::select! {
504 _ = interval.tick() => {
505 let size = mempool_adapter.size().await;
506 if size > 0 {
507 tracing::info!("Mempool: {} transactions", size);
508 }
509 }
510 _ = shutdown.changed() => {
511 tracing::debug!("Mempool maintenance task shutting down");
512 break;
513 }
514 }
515 }
516 });
517 }
518
519 {
521 let peer_manager = self.peer_manager.clone();
522 let mut shutdown = self.shutdown_rx.clone();
523 let guard = tracker.register();
524 tokio::spawn(async move {
525 let _guard = guard;
526 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(30));
527 loop {
528 tokio::select! {
529 _ = interval.tick() => {
530 match peer_manager.get_connected_peers().await {
531 Ok(peers) => {
532 if !peers.is_empty() {
533 tracing::debug!("Connected peers: {}", peers.len());
534 }
535 }
536 Err(e) => tracing::debug!("Error checking peers: {}", e),
537 }
538 }
539 _ = shutdown.changed() => {
540 tracing::debug!("Peer maintenance task shutting down");
541 break;
542 }
543 }
544 }
545 });
546 }
547
548 {
550 let sync_manager = self.sync_manager.clone();
551 let block_index = self.block_index.clone();
552 let mut shutdown = self.shutdown_rx.clone();
553 let guard = tracker.register();
554 tokio::spawn(async move {
555 let _guard = guard;
556 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(10));
557 loop {
558 tokio::select! {
559 _ = interval.tick() => {
560 let sm = sync_manager.read().await;
561 let state = sm.state();
562 let remaining = sm.blocks_remaining();
563 drop(sm);
564
565 let idx = block_index.read().await;
566 let height = idx.best_height();
567 let headers = idx.header_count();
568 drop(idx);
569
570 match state {
571 SyncState::HeaderSync => {
572 tracing::info!(
573 "Sync: downloading headers ({} known, height {})",
574 headers, height
575 );
576 }
577 SyncState::BlockSync => {
578 tracing::info!(
579 "Sync: downloading blocks ({} remaining, index height {})",
580 remaining, height
581 );
582 }
583 SyncState::Synced => {
584 tracing::debug!(
585 "Sync: fully synced at height {} ({} headers)",
586 height, headers
587 );
588 }
589 SyncState::Idle => {}
590 }
591 }
592 _ = shutdown.changed() => {
593 tracing::debug!("Sync reporter task shutting down");
594 break;
595 }
596 }
597 }
598 });
599 }
600
601 {
603 let rebroadcast_mgr = self.rebroadcast_manager.clone();
604 let rebroadcast_mempool = self.mempool_adapter.clone();
605 let rebroadcast_peers = self.peer_manager.clone();
606 let mut shutdown = self.shutdown_rx.clone();
607 let guard = tracker.register();
608 tokio::spawn(async move {
609 let _guard = guard;
610 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(5 * 60));
611 loop {
612 tokio::select! {
613 _ = interval.tick() => {
614 let now = std::time::SystemTime::now()
615 .duration_since(std::time::UNIX_EPOCH)
616 .map(|d| d.as_secs())
617 .unwrap_or(0);
618
619 let mempool_ref = &rebroadcast_mempool;
620 let mut mgr = rebroadcast_mgr.write().await;
621
622 let tracked_txids: Vec<abtc_domain::primitives::Txid> = mgr
623 .check_rebroadcast(now, |txid| {
624 let _ = txid;
625 true
626 })
627 .into_iter()
628 .filter_map(|action| match action {
629 abtc_application::rebroadcast::RebroadcastAction::Reannounce(txid) => {
630 Some(txid)
631 }
632 abtc_application::rebroadcast::RebroadcastAction::Abandon(txid) => {
633 tracing::info!("Abandoning rebroadcast of tx {}", txid);
634 None
635 }
636 })
637 .collect();
638
639 drop(mgr);
640
641 for txid in tracked_txids {
642 if let Ok(Some(_)) = mempool_ref.get_transaction(&txid).await {
643 let inv = abtc_ports::NetworkMessage::Inv {
644 items: vec![abtc_ports::InventoryItem::Tx(txid)],
645 };
646 if let Ok(peers) = rebroadcast_peers.get_connected_peers().await {
647 for peer in peers {
648 let _ = rebroadcast_peers
649 .send_to_peer(peer.id, inv.clone())
650 .await;
651 }
652 }
653 tracing::debug!("Rebroadcast tx {} to peers", txid);
654 }
655 }
656 }
657 _ = shutdown.changed() => {
658 tracing::debug!("Rebroadcast task shutting down");
659 break;
660 }
661 }
662 }
663 });
664 }
665
666 if let Some(ref tcp_mgr) = self.tcp_peer_manager {
668 let tcp = tcp_mgr.clone();
669 let mut shutdown = self.shutdown_rx.clone();
670 let guard = tracker.register();
671 tokio::spawn(async move {
672 let _guard = guard;
673 let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(120));
674 loop {
675 tokio::select! {
676 _ = interval.tick() => {
677 let count = tcp.peer_count().await;
678 if count > 0 {
679 tracing::debug!("TCP peers alive: {}", count);
680 }
681 }
682 _ = shutdown.changed() => {
683 tracing::debug!("TCP keepalive task shutting down");
684 break;
685 }
686 }
687 }
688 });
689 }
690
691 let total = tracker.total;
692 drop(tracker);
693 tracing::info!("Background tasks started ({} tasks)", total);
694 }
695
696 pub async fn handle_peer_event(
701 &self,
702 event: PeerEvent,
703 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
704 let actions = {
705 let mut sm = self.sync_manager.write().await;
706 sm.on_peer_event(
707 event,
708 self.peer_manager.as_ref(),
709 self.block_store.as_ref(),
710 self.chain_state.as_ref(),
711 self.mempool_adapter.as_ref(),
712 )
713 .await?
714 };
715
716 for action in actions {
718 match action {
719 SyncAction::ProcessBlock(block) => {
720 match self.blockchain.validate_and_accept_block(&block).await {
721 Ok(_) => {
722 let info = self.chain_state.get_best_chain_tip().await;
724 let height = info.map(|(_, h)| h).unwrap_or(0);
725
726 let confirmed_fees: Vec<(abtc_domain::primitives::Amount, usize, u32)> =
727 block
728 .transactions
729 .iter()
730 .filter_map(|tx| {
731 if tx.is_coinbase() {
732 return None;
733 }
734 let vsize = tx.compute_vsize() as usize;
739 let estimated_fee =
740 abtc_domain::primitives::Amount::from_sat(
741 vsize as i64, );
743 Some((estimated_fee, vsize, 1u32))
744 })
745 .collect();
746
747 if !confirmed_fees.is_empty() {
748 let mut est = self.fee_estimator.write().await;
749 est.process_block(height, &confirmed_fees);
750 tracing::debug!(
751 "Fee estimator updated: height={}, txs={}",
752 height,
753 confirmed_fees.len()
754 );
755 }
756 }
757 Err(e) => {
758 tracing::warn!("Failed to accept block {}: {}", block.block_hash(), e);
759 }
760 }
761 }
762 SyncAction::ProcessTransaction(tx) => {
763 if let Err(e) = self.blockchain.process_new_transaction(&tx).await {
764 tracing::debug!("Failed to process tx {}: {}", tx.txid(), e);
765 } else {
766 if let Err(e) = self.mempool_adapter.add_transaction(&tx).await {
768 tracing::debug!("Failed to add tx {} to mempool: {}", tx.txid(), e);
769 } else {
770 let _ = self.peer_manager.broadcast_transaction(&tx).await;
772 }
773 }
774 }
775 SyncAction::SendMessage(peer_id, msg) => {
776 if let Err(e) = self.peer_manager.send_to_peer(peer_id, msg).await {
777 tracing::debug!("Failed to send message to peer {}: {}", peer_id, e);
778 }
779 }
780 SyncAction::AcceptedTransaction { tx, from_peer } => {
781 tracing::info!(
784 "Relaying accepted tx {} (from peer {})",
785 tx.txid(),
786 from_peer,
787 );
788 let _ = self.peer_manager.broadcast_transaction(&tx).await;
789 }
790 SyncAction::DisconnectPeer(peer_id) => {
791 let _ = self.peer_manager.disconnect_peer(peer_id).await;
792 }
793 }
794 }
795
796 Ok(())
797 }
798
799 pub async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
804 tracing::info!("Stopping Agentic Bitcoin node");
805 self.running.store(false, Ordering::Release);
806
807 let _ = self.shutdown_tx.send(true);
809 tracing::info!("Shutdown signal sent to background tasks");
810
811 tokio::time::sleep(tokio::time::Duration::from_millis(100)).await;
813 let active = self.task_tracker.lock().unwrap().active_count();
814 if active > 0 {
815 tracing::info!("Waiting for {} background tasks to finish...", active);
816 tokio::time::sleep(tokio::time::Duration::from_secs(1)).await;
818 }
819 let remaining = self.task_tracker.lock().unwrap().active_count();
820 if remaining > 0 {
821 tracing::warn!("{} background tasks still running at shutdown", remaining);
822 } else {
823 tracing::info!("All background tasks stopped cleanly");
824 }
825
826 self.rpc_server.stop().await?;
828 tracing::info!("RPC server stopped");
829
830 if let Ok(peers) = self.peer_manager.get_connected_peers().await {
832 let count = peers.len();
833 for peer in peers {
834 let _ = self.peer_manager.disconnect_peer(peer.id).await;
835 }
836 tracing::info!("Disconnected {} peers", count);
837 }
838
839 let sm = self.sync_manager.read().await;
841 let idx = self.block_index.read().await;
842 let uptime = self.uptime_secs();
843 tracing::info!(
844 "Final state: sync={:?}, height={}, headers={}, uptime={}s",
845 sm.state(),
846 idx.best_height(),
847 idx.header_count(),
848 uptime,
849 );
850
851 tracing::info!("Agentic Bitcoin node stopped");
852 Ok(())
853 }
854
855 pub async fn health(&self) -> NodeHealth {
857 let (active_tasks, total_tasks) = {
858 let tracker = self.task_tracker.lock().unwrap();
859 let active_tasks = tracker.active_count() as u32;
860 let total_tasks = tracker.total;
861 (active_tasks, total_tasks)
862 };
863
864 let sm = self.sync_manager.read().await;
865 let sync_state = format!("{:?}", sm.state());
866 drop(sm);
867
868 let idx = self.block_index.read().await;
869 let block_height = idx.best_height();
870 drop(idx);
871
872 let mempool_size = self.mempool_adapter.size().await as u32;
873
874 let peer_count = self
875 .peer_manager
876 .get_connected_peers()
877 .await
878 .map(|p| p.len() as u32)
879 .unwrap_or(0);
880
881 let rpc_running = self.rpc_server.is_running();
882
883 NodeHealth {
884 is_running: self.running.load(Ordering::Acquire),
885 active_tasks,
886 total_tasks,
887 uptime_secs: self.uptime_secs(),
888 sync_state,
889 block_height,
890 mempool_size,
891 peer_count,
892 rpc_running,
893 }
894 }
895
896 fn uptime_secs(&self) -> u64 {
898 let start = self.start_time.load(Ordering::Relaxed);
899 let now = std::time::SystemTime::now()
900 .duration_since(std::time::UNIX_EPOCH)
901 .map(|d| d.as_secs())
902 .unwrap_or(0);
903 now.saturating_sub(start)
904 }
905
906 pub async fn get_chain_info(&self) -> Result<abtc_application::services::ChainInfo, String> {
908 self.blockchain.get_chain_info().await
909 }
910}
911
912async fn wait_for_shutdown_signal() {
918 #[cfg(unix)]
919 {
920 use tokio::signal::unix::{signal, SignalKind};
921 let mut sigterm =
922 signal(SignalKind::terminate()).expect("failed to install SIGTERM handler");
923 tokio::select! {
924 _ = tokio::signal::ctrl_c() => {
925 tracing::info!("Received SIGINT (Ctrl+C)");
926 }
927 _ = sigterm.recv() => {
928 tracing::info!("Received SIGTERM");
929 }
930 }
931 }
932 #[cfg(not(unix))]
933 {
934 tokio::signal::ctrl_c()
935 .await
936 .expect("failed to listen for Ctrl+C");
937 tracing::info!("Received Ctrl+C");
938 }
939}
940
941pub async fn run() -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
946 let args = CliArgs::parse();
947
948 let node = BitcoinNode::new(args).await?;
949 node.start().await?;
950
951 wait_for_shutdown_signal().await;
953
954 tracing::info!(
956 "Initiating graceful shutdown (timeout: {}s)...",
957 SHUTDOWN_TIMEOUT_SECS
958 );
959
960 let shutdown_result = tokio::time::timeout(
961 tokio::time::Duration::from_secs(SHUTDOWN_TIMEOUT_SECS),
962 node.stop(),
963 )
964 .await;
965
966 match shutdown_result {
967 Ok(Ok(())) => {
968 tracing::info!("Clean shutdown completed");
969 }
970 Ok(Err(e)) => {
971 tracing::error!("Shutdown encountered error: {}", e);
972 }
973 Err(_) => {
974 tracing::error!(
975 "Shutdown timed out after {}s — forcing exit",
976 SHUTDOWN_TIMEOUT_SECS
977 );
978 }
979 }
980
981 Ok(())
982}
983
984#[cfg(test)]
985mod tests {
986 use super::*;
987
988 #[tokio::test]
989 async fn test_node_creation() {
990 let args = CliArgs {
991 network: "regtest".to_string(),
992 datadir: "/tmp/bitcoin-test".to_string(),
993 rpc_port: 18332,
994 p2p_port: 18333,
995 log_level: "debug".to_string(),
996 max_mempool_mb: 300,
997 enable_p2p: false,
998 seed_peers: Vec::new(),
999 storage_backend: "memory".to_string(),
1000 enable_wallet: true,
1001 address_type: "bech32".to_string(),
1002 wallet_file: None,
1003 signet_challenge: None,
1004 };
1005
1006 let node = BitcoinNode::new(args).await;
1007 assert!(node.is_ok());
1008 }
1009
1010 #[tokio::test]
1011 async fn test_node_chain_info() {
1012 let args = CliArgs {
1013 network: "regtest".to_string(),
1014 datadir: "/tmp/bitcoin-test".to_string(),
1015 rpc_port: 18332,
1016 p2p_port: 18333,
1017 log_level: "warn".to_string(),
1018 max_mempool_mb: 300,
1019 enable_p2p: false,
1020 seed_peers: Vec::new(),
1021 storage_backend: "memory".to_string(),
1022 enable_wallet: true,
1023 address_type: "bech32".to_string(),
1024 wallet_file: None,
1025 signet_challenge: None,
1026 };
1027
1028 let node = BitcoinNode::new(args).await.unwrap();
1029 let info = node.get_chain_info().await.unwrap();
1030 assert_eq!(info.height, 0);
1031 assert_eq!(info.blocks, 1);
1032 }
1033
1034 #[tokio::test]
1035 async fn test_node_has_block_index() {
1036 let args = CliArgs {
1037 network: "regtest".to_string(),
1038 datadir: "/tmp/bitcoin-test".to_string(),
1039 rpc_port: 18332,
1040 p2p_port: 18333,
1041 log_level: "warn".to_string(),
1042 max_mempool_mb: 300,
1043 enable_p2p: false,
1044 seed_peers: Vec::new(),
1045 storage_backend: "memory".to_string(),
1046 enable_wallet: true,
1047 address_type: "bech32".to_string(),
1048 wallet_file: None,
1049 signet_challenge: None,
1050 };
1051
1052 let node = BitcoinNode::new(args).await.unwrap();
1053
1054 let idx = node.block_index.read().await;
1056 assert_eq!(idx.best_height(), 0);
1057 assert_eq!(idx.header_count(), 1);
1058 }
1059
1060 #[tokio::test]
1061 async fn test_node_has_sync_manager() {
1062 let args = CliArgs {
1063 network: "regtest".to_string(),
1064 datadir: "/tmp/bitcoin-test".to_string(),
1065 rpc_port: 18332,
1066 p2p_port: 18333,
1067 log_level: "warn".to_string(),
1068 max_mempool_mb: 300,
1069 enable_p2p: false,
1070 seed_peers: Vec::new(),
1071 storage_backend: "memory".to_string(),
1072 enable_wallet: true,
1073 address_type: "bech32".to_string(),
1074 wallet_file: None,
1075 signet_challenge: None,
1076 };
1077
1078 let node = BitcoinNode::new(args).await.unwrap();
1079
1080 let sm = node.sync_manager.read().await;
1082 assert_eq!(sm.state(), SyncState::Idle);
1083 assert_eq!(sm.blocks_remaining(), 0);
1084 }
1085
1086 fn make_test_args() -> CliArgs {
1087 make_test_args_with_port(18332)
1088 }
1089
1090 fn make_test_args_with_port(rpc_port: u16) -> CliArgs {
1093 CliArgs {
1094 network: "regtest".to_string(),
1095 datadir: "/tmp/bitcoin-test".to_string(),
1096 rpc_port,
1097 p2p_port: 18333,
1098 log_level: "warn".to_string(),
1099 max_mempool_mb: 300,
1100 enable_p2p: false,
1101 seed_peers: Vec::new(),
1102 storage_backend: "memory".to_string(),
1103 enable_wallet: true,
1104 address_type: "bech32".to_string(),
1105 wallet_file: None,
1106 signet_challenge: None,
1107 }
1108 }
1109
1110 fn unique_port() -> u16 {
1112 use std::sync::atomic::{AtomicU16, Ordering};
1113 static PORT: AtomicU16 = AtomicU16::new(19400);
1114 PORT.fetch_add(1, Ordering::Relaxed)
1115 }
1116
1117 #[tokio::test]
1118 async fn test_node_has_fee_estimator() {
1119 let node = BitcoinNode::new(make_test_args()).await.unwrap();
1120
1121 let fe = node.fee_estimator.read().await;
1123 let estimate = fe.estimate_fee(6);
1124 assert!(estimate > 0.0);
1126 }
1127
1128 #[tokio::test]
1129 async fn test_node_has_rebroadcast_manager() {
1130 let node = BitcoinNode::new(make_test_args()).await.unwrap();
1131
1132 let rb = node.rebroadcast_manager.read().await;
1133 assert_eq!(
1134 rb.tracked_count(),
1135 0,
1136 "No transactions should be tracked initially"
1137 );
1138 }
1139
1140 #[tokio::test]
1141 async fn test_node_has_wallet_when_enabled() {
1142 let node = BitcoinNode::new(make_test_args()).await.unwrap();
1143 assert!(
1144 node.wallet.is_some(),
1145 "Wallet should be present when enable_wallet is true"
1146 );
1147 }
1148
1149 #[tokio::test]
1150 async fn test_node_no_wallet_when_disabled() {
1151 let mut args = make_test_args();
1152 args.enable_wallet = false;
1153
1154 let node = BitcoinNode::new(args).await.unwrap();
1155 assert!(
1156 node.wallet.is_none(),
1157 "Wallet should be absent when enable_wallet is false"
1158 );
1159 }
1160
1161 #[tokio::test]
1162 async fn test_node_mempool_starts_empty() {
1163 let node = BitcoinNode::new(make_test_args()).await.unwrap();
1164
1165 let info = node.mempool_adapter.get_mempool_info().await.unwrap();
1166 assert_eq!(info.size, 0);
1167 assert_eq!(info.bytes, 0);
1168 }
1169
1170 #[tokio::test]
1171 async fn test_node_testnet_creation() {
1172 let mut args = make_test_args();
1173 args.network = "testnet".to_string();
1174
1175 let node = BitcoinNode::new(args).await.unwrap();
1176 let info = node.get_chain_info().await.unwrap();
1177 assert_eq!(info.height, 0);
1178 assert_eq!(info.blocks, 1);
1179 }
1180
1181 #[tokio::test]
1182 async fn test_node_with_wallet_persistence() {
1183 let wallet_path = {
1184 let mut p = std::env::temp_dir();
1185 let id = std::time::SystemTime::now()
1186 .duration_since(std::time::UNIX_EPOCH)
1187 .unwrap()
1188 .as_nanos() as u64;
1189 p.push(format!("test_infra_wallet_{}.json", id));
1190 p.to_str().unwrap().to_string()
1191 };
1192
1193 {
1195 let mut args = make_test_args();
1196 args.wallet_file = Some(wallet_path.clone());
1197
1198 let node = BitcoinNode::new(args).await.unwrap();
1199 assert!(node.wallet.is_some());
1200
1201 let wallet = node.wallet.as_ref().unwrap();
1203 let addr = wallet.get_new_address(Some("persist-test")).await.unwrap();
1204 assert!(addr.starts_with("tb1q") || addr.starts_with("bcrt1q"));
1205 }
1206
1207 assert!(std::path::Path::new(&wallet_path).exists());
1209
1210 {
1212 let mut args = make_test_args();
1213 args.wallet_file = Some(wallet_path.clone());
1214
1215 let node = BitcoinNode::new(args).await.unwrap();
1216 let wallet = node.wallet.as_ref().unwrap();
1217
1218 let unspent = wallet.list_unspent(0, None).await.unwrap();
1220 assert_eq!(unspent.len(), 0);
1222 }
1223
1224 let _ = tokio::fs::remove_file(&wallet_path).await;
1226 }
1227
1228 #[tokio::test]
1229 async fn test_node_wallet_file_none_is_in_memory() {
1230 let args = make_test_args(); let node = BitcoinNode::new(args).await.unwrap();
1233 assert!(node.wallet.is_some());
1234
1235 let wallet = node.wallet.as_ref().unwrap();
1237 let addr = wallet.get_new_address(None).await.unwrap();
1238 assert!(!addr.is_empty());
1239 }
1240
1241 #[tokio::test]
1242 async fn test_node_signet_creation() {
1243 let mut args = make_test_args();
1244 args.network = "signet".to_string();
1245
1246 let node = BitcoinNode::new(args).await.unwrap();
1247 let info = node.get_chain_info().await.unwrap();
1248 assert_eq!(info.height, 0);
1249 assert_eq!(info.blocks, 1);
1250 }
1251
1252 #[tokio::test]
1253 async fn test_node_custom_signet_challenge() {
1254 let mut args = make_test_args();
1255 args.network = "signet".to_string();
1256 args.signet_challenge = Some("51".to_string());
1258
1259 let node = BitcoinNode::new(args).await.unwrap();
1260 let info = node.get_chain_info().await.unwrap();
1261 assert_eq!(info.height, 0);
1262 }
1263
1264 #[tokio::test]
1265 async fn test_node_invalid_signet_challenge_hex() {
1266 let mut args = make_test_args();
1267 args.network = "signet".to_string();
1268 args.signet_challenge = Some("not_valid_hex".to_string());
1269
1270 let result = BitcoinNode::new(args).await;
1271 assert!(result.is_err(), "Invalid hex should produce an error");
1272 }
1273
1274 #[test]
1277 fn test_task_tracker_register_and_count() {
1278 let mut tracker = TaskTracker::new();
1279 assert_eq!(tracker.active_count(), 0);
1280 assert_eq!(tracker.total, 0);
1281
1282 let g1 = tracker.register();
1283 assert_eq!(tracker.active_count(), 1);
1284 assert_eq!(tracker.total, 1);
1285
1286 let g2 = tracker.register();
1287 assert_eq!(tracker.active_count(), 2);
1288 assert_eq!(tracker.total, 2);
1289
1290 drop(g1);
1291 assert_eq!(tracker.active_count(), 1);
1292
1293 drop(g2);
1294 assert_eq!(tracker.active_count(), 0);
1295 assert_eq!(tracker.total, 2);
1297 }
1298
1299 #[test]
1300 fn test_task_guard_decrements_on_drop() {
1301 let mut tracker = TaskTracker::new();
1302 {
1303 let _g = tracker.register();
1304 assert_eq!(tracker.active_count(), 1);
1305 }
1306 assert_eq!(tracker.active_count(), 0);
1308 }
1309
1310 #[tokio::test]
1311 async fn test_node_starts_not_running() {
1312 let node = BitcoinNode::new(make_test_args()).await.unwrap();
1313 assert!(!node.running.load(Ordering::Acquire));
1315 }
1316
1317 #[tokio::test]
1318 async fn test_node_start_sets_running() {
1319 let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1320 .await
1321 .unwrap();
1322 node.start().await.unwrap();
1323 assert!(node.running.load(Ordering::Acquire));
1324 let _ = node.stop().await;
1326 }
1327
1328 #[tokio::test]
1329 async fn test_node_stop_clears_running() {
1330 let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1331 .await
1332 .unwrap();
1333 node.start().await.unwrap();
1334 assert!(node.running.load(Ordering::Acquire));
1335
1336 node.stop().await.unwrap();
1337 assert!(!node.running.load(Ordering::Acquire));
1338 }
1339
1340 #[tokio::test]
1341 async fn test_shutdown_signal_stops_tasks() {
1342 let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1343 .await
1344 .unwrap();
1345 node.start().await.unwrap();
1346
1347 let active_before = node.task_tracker.lock().unwrap().active_count();
1349 assert!(active_before > 0, "Expected background tasks to be running");
1350
1351 let _ = node.shutdown_tx.send(true);
1353 tokio::time::sleep(tokio::time::Duration::from_millis(200)).await;
1355
1356 let active_after = node.task_tracker.lock().unwrap().active_count();
1357 assert_eq!(
1358 active_after, 0,
1359 "All tasks should have exited after shutdown signal"
1360 );
1361
1362 let _ = node.stop().await;
1363 }
1364
1365 #[tokio::test]
1366 async fn test_health_before_start() {
1367 let node = BitcoinNode::new(make_test_args()).await.unwrap();
1368 let health = node.health().await;
1369
1370 assert!(!health.is_running);
1371 assert_eq!(health.active_tasks, 0);
1372 assert_eq!(health.total_tasks, 0);
1373 assert_eq!(health.block_height, 0);
1374 assert_eq!(health.mempool_size, 0);
1375 assert_eq!(health.peer_count, 0);
1376 assert_eq!(health.sync_state, "Idle");
1377 }
1378
1379 #[tokio::test]
1380 async fn test_health_after_start() {
1381 let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1382 .await
1383 .unwrap();
1384 node.start().await.unwrap();
1385
1386 let health = node.health().await;
1387 assert!(health.is_running);
1388 assert!(health.active_tasks > 0, "Background tasks should be active");
1389 assert!(health.total_tasks > 0, "Total tasks should be non-zero");
1390 assert!(health.rpc_running);
1391
1392 let _ = node.stop().await;
1393 }
1394
1395 #[tokio::test]
1396 async fn test_health_after_stop() {
1397 let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1398 .await
1399 .unwrap();
1400 node.start().await.unwrap();
1401 node.stop().await.unwrap();
1402
1403 let health = node.health().await;
1404 assert!(!health.is_running);
1405 assert_eq!(health.active_tasks, 0, "No tasks should remain after stop");
1406 }
1407
1408 #[tokio::test]
1409 async fn test_uptime_increases() {
1410 let node = BitcoinNode::new(make_test_args()).await.unwrap();
1411 let t1 = node.uptime_secs();
1412 tokio::time::sleep(tokio::time::Duration::from_millis(1100)).await;
1413 let t2 = node.uptime_secs();
1414 assert!(t2 >= t1 + 1, "Uptime should increase over time");
1415 }
1416
1417 #[tokio::test]
1418 async fn test_node_health_fields_consistent() {
1419 let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1420 .await
1421 .unwrap();
1422 node.start().await.unwrap();
1423
1424 let health = node.health().await;
1425
1426 assert!(health.active_tasks <= health.total_tasks);
1428 assert!(health.uptime_secs < 60);
1430
1431 let _ = node.stop().await;
1432 }
1433
1434 #[tokio::test]
1435 async fn test_double_stop_is_safe() {
1436 let node = BitcoinNode::new(make_test_args_with_port(unique_port()))
1437 .await
1438 .unwrap();
1439 node.start().await.unwrap();
1440
1441 node.stop().await.unwrap();
1443 node.stop().await.unwrap();
1445 }
1446
1447 #[test]
1448 fn test_node_health_clone() {
1449 let health = NodeHealth {
1450 is_running: true,
1451 active_tasks: 5,
1452 total_tasks: 5,
1453 uptime_secs: 100,
1454 sync_state: "Synced".to_string(),
1455 block_height: 800_000,
1456 mempool_size: 42,
1457 peer_count: 8,
1458 rpc_running: true,
1459 };
1460
1461 let cloned = health.clone();
1462 assert_eq!(cloned.is_running, true);
1463 assert_eq!(cloned.active_tasks, 5);
1464 assert_eq!(cloned.block_height, 800_000);
1465 assert_eq!(cloned.peer_count, 8);
1466 }
1467
1468 #[test]
1469 fn test_node_health_debug() {
1470 let health = NodeHealth {
1471 is_running: false,
1472 active_tasks: 0,
1473 total_tasks: 5,
1474 uptime_secs: 0,
1475 sync_state: "Idle".to_string(),
1476 block_height: 0,
1477 mempool_size: 0,
1478 peer_count: 0,
1479 rpc_running: false,
1480 };
1481
1482 let debug_str = format!("{:?}", health);
1483 assert!(debug_str.contains("NodeHealth"));
1484 assert!(debug_str.contains("is_running: false"));
1485 }
1486}