Skip to main content

abtc_application/
net_processing.rs

1//! Net Processing — Chain synchronization and message handling
2//!
3//! This module implements the "brain" of the P2P protocol, corresponding to
4//! Bitcoin Core's `net_processing.cpp`. It orchestrates:
5//!
6//! - **Headers-first sync**: Send `getheaders`, receive `headers`, add to block index
7//! - **Block download**: Request blocks via `getdata` for headers we've accepted
8//! - **Transaction relay**: Process `inv`/`getdata`/`tx` messages for mempool
9//! - **Initial Block Download (IBD)**: Fast sync from genesis to chain tip
10//!
11//! ## Architecture
12//!
13//! `SyncManager` holds references to the `BlockIndex`, `PeerManager`, and storage
14//! ports. It processes `PeerEvent` messages and drives the sync state machine.
15
16use crate::block_index::{BlockIndex, BlockValidationStatus};
17use crate::orphan_pool::OrphanPool;
18use crate::peer_scoring::{Misbehavior, PeerScoring, ScoreAction};
19use abtc_domain::primitives::{Block, BlockHash, BlockHeader, Transaction};
20use abtc_ports::{
21    BlockStore, ChainStateStore, InventoryItem, MempoolPort, NetworkMessage, PeerEvent, PeerInfo,
22    PeerManager,
23};
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::net::SocketAddr;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29/// Maximum number of headers to request in a single getheaders message
30const MAX_HEADERS_PER_REQUEST: usize = 2000;
31
32/// Maximum number of blocks to have in-flight at once
33const MAX_BLOCKS_IN_FLIGHT: usize = 16;
34
35/// Maximum number of inventory items to send in a single inv message
36const MAX_INV_SIZE: usize = 500;
37
38/// Protocol version we require from peers
39const MIN_PEER_VERSION: u32 = 70015;
40
41/// Our protocol version
42const OUR_PROTOCOL_VERSION: u32 = 70016;
43
44/// Our user agent string
45const OUR_USER_AGENT: &str = "/agentic-bitcoin:0.1.0/";
46
47/// Our advertised services (NODE_NETWORK | NODE_WITNESS)
48const OUR_SERVICES: u64 = 1 | (1 << 3);
49
50/// Maximum number of addresses to send in a single addr message
51const MAX_ADDR_TO_SEND: usize = 1000;
52
53/// Maximum number of addresses to store in our address book
54const MAX_KNOWN_ADDRESSES: usize = 10_000;
55
56/// Maximum age of an address before we discard it (3 hours in seconds)
57const MAX_ADDR_AGE: u32 = 3 * 60 * 60;
58
59/// State of the P2P handshake with a peer.
60#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum HandshakeState {
62    /// We've connected; waiting to send or receive Version message.
63    AwaitingVersion,
64    /// We've received their Version and sent ours + Verack; waiting for their Verack.
65    AwaitingVerack,
66    /// Handshake complete — ready for normal message exchange.
67    Complete,
68}
69
70/// Sync state for a single peer.
71#[derive(Debug, Clone)]
72struct PeerSyncState {
73    /// Peer info (used for logging and protocol checks).
74    info: PeerInfo,
75    /// State of the P2P handshake.
76    handshake: HandshakeState,
77    /// Whether we've sent a getheaders to this peer and are waiting for a response.
78    headers_sync_pending: bool,
79    /// Blocks we've requested from this peer.
80    blocks_in_flight: HashSet<BlockHash>,
81    /// The last header hash we received from this peer.
82    last_header_received: Option<BlockHash>,
83}
84
85/// Current state of the blockchain sync process.
86#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum SyncState {
88    /// Initial state — no peers connected.
89    Idle,
90    /// Downloading headers from peers (IBD phase 1).
91    HeaderSync,
92    /// Downloading blocks for validated headers (IBD phase 2).
93    BlockSync,
94    /// Fully synced — processing new blocks as they arrive.
95    Synced,
96}
97
98/// The sync manager drives chain synchronization.
99///
100/// It processes peer events, manages the block index, and coordinates
101/// header/block downloads across multiple peers.
102pub struct SyncManager {
103    /// The block header index
104    block_index: Arc<RwLock<BlockIndex>>,
105    /// Per-peer sync state
106    peer_states: HashMap<u64, PeerSyncState>,
107    /// Current overall sync state
108    state: SyncState,
109    /// Queue of block hashes we need to download (in order)
110    blocks_to_download: VecDeque<BlockHash>,
111    /// Set of blocks currently being downloaded
112    blocks_in_flight: HashSet<BlockHash>,
113    /// Blocks we've received but haven't processed yet (may be out of order)
114    orphan_blocks: HashMap<BlockHash, Block>,
115    /// The next block height we need to connect to the chain
116    next_block_height: u32,
117    /// Transactions we've recently seen (to avoid re-requesting)
118    recently_seen_txids: HashSet<abtc_domain::primitives::Txid>,
119    /// Known peer addresses: (address → (timestamp, services))
120    known_addresses: HashMap<SocketAddr, (u32, u64)>,
121    /// Peer misbehavior scoring and ban tracking
122    peer_scoring: PeerScoring,
123    /// Orphan transaction pool — transactions whose parent inputs are missing
124    orphan_tx_pool: OrphanPool,
125}
126
127impl SyncManager {
128    /// Create a new sync manager with the given block index.
129    pub fn new(block_index: Arc<RwLock<BlockIndex>>) -> Self {
130        SyncManager {
131            block_index,
132            peer_states: HashMap::new(),
133            state: SyncState::Idle,
134            blocks_to_download: VecDeque::new(),
135            blocks_in_flight: HashSet::new(),
136            orphan_blocks: HashMap::new(),
137            next_block_height: 1, // Genesis is height 0, we start downloading from 1
138            recently_seen_txids: HashSet::new(),
139            known_addresses: HashMap::new(),
140            peer_scoring: PeerScoring::new(),
141            orphan_tx_pool: OrphanPool::new(),
142        }
143    }
144
145    /// Get the current sync state.
146    pub fn state(&self) -> SyncState {
147        self.state
148    }
149
150    /// Get the number of blocks remaining to download.
151    pub fn blocks_remaining(&self) -> usize {
152        self.blocks_to_download.len() + self.blocks_in_flight.len()
153    }
154
155    /// Process a peer event. Returns a list of actions the caller should take
156    /// (send messages, store blocks, etc.).
157    pub async fn on_peer_event(
158        &mut self,
159        event: PeerEvent,
160        peer_manager: &dyn PeerManager,
161        block_store: &dyn BlockStore,
162        chain_state: &dyn ChainStateStore,
163        mempool: &dyn MempoolPort,
164    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
165        let mut actions = Vec::new();
166
167        match event {
168            PeerEvent::Connected { peer_info } => {
169                actions.extend(self.on_peer_connected(peer_info, peer_manager).await?);
170            }
171            PeerEvent::Disconnected { peer_id } => {
172                self.on_peer_disconnected(peer_id);
173            }
174            PeerEvent::MessageReceived { peer_id, message } => {
175                actions.extend(
176                    self.on_message_received(
177                        peer_id,
178                        message,
179                        peer_manager,
180                        block_store,
181                        chain_state,
182                        mempool,
183                    )
184                    .await?,
185                );
186            }
187            PeerEvent::Misbehaving {
188                peer_id,
189                reason,
190                score,
191            } => {
192                tracing::warn!(
193                    "Peer {} misbehaving (score +{}): {}",
194                    peer_id,
195                    score,
196                    reason
197                );
198                // Record misbehavior and check if the peer should be banned.
199                let now = std::time::SystemTime::now()
200                    .duration_since(std::time::UNIX_EPOCH)
201                    .map(|d| d.as_secs())
202                    .unwrap_or(0);
203                let action =
204                    self.peer_scoring
205                        .record_misbehavior(peer_id, Misbehavior::Custom(score), now);
206                if let ScoreAction::Ban {
207                    peer_id,
208                    addr,
209                    reason,
210                } = action
211                {
212                    tracing::warn!("Banning peer {} ({}): {}", peer_id, addr, reason);
213                    actions.push(SyncAction::DisconnectPeer(peer_id));
214                }
215            }
216        }
217
218        Ok(actions)
219    }
220
221    /// Handle a new peer connection — initiate handshake by sending our Version.
222    async fn on_peer_connected(
223        &mut self,
224        peer_info: PeerInfo,
225        _peer_manager: &dyn PeerManager,
226    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
227        let peer_id = peer_info.id;
228
229        // Register peer in the scoring system.
230        self.peer_scoring.register_peer(peer_id, peer_info.addr);
231
232        tracing::info!(
233            "New peer connected: {} — initiating handshake",
234            peer_info.addr,
235        );
236
237        let best_height = {
238            let index = self.block_index.read().await;
239            index.best_height()
240        };
241
242        let sync_state = PeerSyncState {
243            info: peer_info.clone(),
244            handshake: HandshakeState::AwaitingVersion,
245            headers_sync_pending: false,
246            blocks_in_flight: HashSet::new(),
247            last_header_received: None,
248        };
249
250        self.peer_states.insert(peer_id, sync_state);
251
252        // Send our Version message
253        let version_msg = self.build_version_message(peer_info.addr, best_height);
254        Ok(vec![SyncAction::SendMessage(peer_id, version_msg)])
255    }
256
257    /// Build a Version message to send to a peer.
258    fn build_version_message(
259        &self,
260        addr_recv: std::net::SocketAddr,
261        start_height: u32,
262    ) -> NetworkMessage {
263        use std::time::{SystemTime, UNIX_EPOCH};
264        let timestamp = SystemTime::now()
265            .duration_since(UNIX_EPOCH)
266            .unwrap_or_default()
267            .as_secs() as i64;
268
269        NetworkMessage::Version {
270            version: OUR_PROTOCOL_VERSION,
271            services: OUR_SERVICES,
272            timestamp,
273            addr_recv,
274            addr_from: "0.0.0.0:0".parse().unwrap(),
275            nonce: rand::random::<u64>(),
276            user_agent: OUR_USER_AGENT.to_string(),
277            start_height,
278            relay: true,
279        }
280    }
281
282    /// Handle an incoming Version message — validate and advance handshake.
283    #[allow(clippy::too_many_arguments)]
284    async fn on_version(
285        &mut self,
286        peer_id: u64,
287        version: u32,
288        services: u64,
289        user_agent: String,
290        start_height: u32,
291        relay: bool,
292        _peer_manager: &dyn PeerManager,
293    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
294        let mut actions = Vec::new();
295
296        // Check minimum version
297        if version < MIN_PEER_VERSION {
298            tracing::warn!(
299                "Peer {} has old protocol version {}, disconnecting",
300                peer_id,
301                version
302            );
303            return Ok(vec![SyncAction::DisconnectPeer(peer_id)]);
304        }
305
306        // Update peer info with actual values from their Version
307        if let Some(state) = self.peer_states.get_mut(&peer_id) {
308            state.info.version = version;
309            state.info.services = services;
310            state.info.subver = user_agent.clone();
311            state.info.start_height = start_height;
312            state.info.relay_txs = relay;
313
314            match state.handshake {
315                HandshakeState::AwaitingVersion => {
316                    // We sent our Version, they sent theirs. Send Verack.
317                    state.handshake = HandshakeState::AwaitingVerack;
318                    actions.push(SyncAction::SendMessage(peer_id, NetworkMessage::Verack));
319                    tracing::debug!(
320                        "Received Version from peer {} (v{}, {}, height={}), sent Verack",
321                        peer_id,
322                        version,
323                        user_agent,
324                        start_height
325                    );
326                }
327                _ => {
328                    tracing::warn!(
329                        "Unexpected Version from peer {} (state: {:?})",
330                        peer_id,
331                        state.handshake
332                    );
333                }
334            }
335        }
336
337        Ok(actions)
338    }
339
340    /// Handle an incoming Verack message — complete handshake and start sync.
341    async fn on_verack(
342        &mut self,
343        peer_id: u64,
344        peer_manager: &dyn PeerManager,
345    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
346        let mut actions = Vec::new();
347
348        let should_start_sync = if let Some(state) = self.peer_states.get_mut(&peer_id) {
349            match state.handshake {
350                HandshakeState::AwaitingVerack => {
351                    state.handshake = HandshakeState::Complete;
352                    tracing::info!(
353                        "Handshake complete with peer {} (v{}, height={})",
354                        peer_id,
355                        state.info.version,
356                        state.info.start_height
357                    );
358                    true
359                }
360                _ => {
361                    tracing::warn!(
362                        "Unexpected Verack from peer {} (state: {:?})",
363                        peer_id,
364                        state.handshake
365                    );
366                    false
367                }
368            }
369        } else {
370            false
371        };
372
373        // Start header sync now that handshake is complete
374        if should_start_sync {
375            // Add the peer's address to our address book.
376            if let Some(state) = self.peer_states.get(&peer_id) {
377                let now = std::time::SystemTime::now()
378                    .duration_since(std::time::UNIX_EPOCH)
379                    .unwrap_or_default()
380                    .as_secs() as u32;
381                self.known_addresses
382                    .insert(state.info.addr, (now, state.info.services));
383            }
384
385            // Request addresses from the peer for discovery.
386            actions.push(SyncAction::SendMessage(peer_id, NetworkMessage::GetAddr));
387
388            if self.state == SyncState::Idle || self.state == SyncState::HeaderSync {
389                self.state = SyncState::HeaderSync;
390                self.request_headers_from_peer(peer_id, peer_manager)
391                    .await?;
392            }
393        }
394
395        Ok(actions)
396    }
397
398    /// Handle a peer disconnection — clean up state and reassign work.
399    fn on_peer_disconnected(&mut self, peer_id: u64) {
400        self.peer_scoring.remove_peer(peer_id);
401
402        if let Some(state) = self.peer_states.remove(&peer_id) {
403            // Put any in-flight blocks back in the download queue
404            for hash in &state.blocks_in_flight {
405                self.blocks_in_flight.remove(hash);
406                self.blocks_to_download.push_front(*hash);
407            }
408
409            // Remove orphan transactions from this peer
410            let orphans_removed = self.orphan_tx_pool.remove_for_peer(peer_id);
411            tracing::info!(
412                "Peer {} disconnected, {} blocks reassigned, {} orphan txs removed",
413                peer_id,
414                state.blocks_in_flight.len(),
415                orphans_removed,
416            );
417        }
418
419        if self.peer_states.is_empty() {
420            self.state = SyncState::Idle;
421        }
422    }
423
424    /// Handle an incoming message from a peer.
425    async fn on_message_received(
426        &mut self,
427        peer_id: u64,
428        message: NetworkMessage,
429        peer_manager: &dyn PeerManager,
430        block_store: &dyn BlockStore,
431        _chain_state: &dyn ChainStateStore,
432        mempool: &dyn MempoolPort,
433    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
434        let mut actions = Vec::new();
435
436        match message {
437            // ── Handshake ────────────────────────────────────────
438            NetworkMessage::Version {
439                version,
440                services,
441                user_agent,
442                start_height,
443                relay,
444                ..
445            } => {
446                actions.extend(
447                    self.on_version(
448                        peer_id,
449                        version,
450                        services,
451                        user_agent,
452                        start_height,
453                        relay,
454                        peer_manager,
455                    )
456                    .await?,
457                );
458            }
459            NetworkMessage::Verack => {
460                actions.extend(self.on_verack(peer_id, peer_manager).await?);
461            }
462
463            // ── Sync messages ────────────────────────────────────
464            NetworkMessage::Headers { headers } => {
465                actions.extend(self.on_headers(peer_id, headers, peer_manager).await?);
466            }
467            NetworkMessage::Block { block } => {
468                actions.extend(
469                    self.on_block(peer_id, block, block_store, peer_manager)
470                        .await?,
471                );
472            }
473            NetworkMessage::Inv { items } => {
474                actions.extend(self.on_inv(peer_id, items, peer_manager).await?);
475            }
476            NetworkMessage::GetHeaders {
477                block_locator,
478                hash_stop,
479                ..
480            } => {
481                actions.extend(
482                    self.on_getheaders(peer_id, block_locator, hash_stop)
483                        .await?,
484                );
485            }
486            NetworkMessage::GetBlocks {
487                block_locator,
488                hash_stop,
489                ..
490            } => {
491                actions.extend(self.on_getblocks(peer_id, block_locator, hash_stop).await?);
492            }
493            NetworkMessage::GetData { items } => {
494                actions.extend(self.on_getdata(peer_id, items, block_store).await?);
495            }
496            NetworkMessage::Ping { nonce } => {
497                actions.push(SyncAction::SendMessage(
498                    peer_id,
499                    NetworkMessage::Pong { nonce },
500                ));
501            }
502            NetworkMessage::Tx { tx } => {
503                let txid = tx.txid();
504
505                // Run basic consensus validation before accepting.
506                if let Err(e) = abtc_domain::consensus::rules::check_transaction(&tx) {
507                    tracing::debug!(
508                        "Rejected tx {} from peer {}: consensus violation: {}",
509                        txid,
510                        peer_id,
511                        e,
512                    );
513                    // Invalid — don't relay or process.
514                    return Ok(actions);
515                }
516
517                // Reject coinbase transactions from the network.
518                if tx.is_coinbase() {
519                    tracing::debug!("Rejected coinbase tx {} from peer {}", txid, peer_id);
520                    return Ok(actions);
521                }
522
523                // Expire stale orphans periodically.
524                let now = std::time::SystemTime::now()
525                    .duration_since(std::time::UNIX_EPOCH)
526                    .map(|d| d.as_secs())
527                    .unwrap_or(0);
528                self.orphan_tx_pool.expire_old_orphans(now);
529
530                // Attempt to add to the mempool.
531                match mempool.add_transaction(&tx).await {
532                    Ok(()) => {
533                        tracing::info!("Accepted tx {} from peer {} into mempool", txid, peer_id,);
534                        actions.push(SyncAction::AcceptedTransaction {
535                            tx: tx.clone(),
536                            from_peer: peer_id,
537                        });
538
539                        // Check if any orphan transactions were waiting for
540                        // outputs from this newly-accepted transaction.
541                        let children = self
542                            .orphan_tx_pool
543                            .get_children_of(&txid, tx.outputs.len() as u32);
544                        for child_txid in children {
545                            if let Some(entry) = self.orphan_tx_pool.remove_orphan(&child_txid) {
546                                tracing::debug!(
547                                    "Resolving orphan tx {} (parent {} now available)",
548                                    child_txid,
549                                    txid,
550                                );
551                                // Re-submit the orphan to the mempool
552                                match mempool.add_transaction(&entry.tx).await {
553                                    Ok(()) => {
554                                        tracing::info!(
555                                            "Orphan tx {} accepted into mempool",
556                                            child_txid,
557                                        );
558                                        actions.push(SyncAction::AcceptedTransaction {
559                                            tx: entry.tx,
560                                            from_peer: entry.from_peer,
561                                        });
562                                    }
563                                    Err(e) => {
564                                        tracing::debug!(
565                                            "Orphan tx {} still rejected: {}",
566                                            child_txid,
567                                            e,
568                                        );
569                                    }
570                                }
571                            }
572                        }
573                    }
574                    Err(e) => {
575                        tracing::debug!(
576                            "Mempool rejected tx {} from peer {}: {}",
577                            txid,
578                            peer_id,
579                            e,
580                        );
581                        // The mempool rejected this tx — it may be an orphan
582                        // (missing parent inputs). Try adding to orphan pool.
583                        let add_result = self.orphan_tx_pool.add_orphan(tx.clone(), peer_id, now);
584                        match add_result {
585                            crate::orphan_pool::AddOrphanResult::Added => {
586                                tracing::debug!(
587                                    "Added tx {} to orphan pool (from peer {})",
588                                    txid,
589                                    peer_id,
590                                );
591                            }
592                            crate::orphan_pool::AddOrphanResult::AddedAfterEviction { evicted } => {
593                                tracing::debug!(
594                                    "Added tx {} to orphan pool after evicting {} (from peer {})",
595                                    txid,
596                                    evicted,
597                                    peer_id,
598                                );
599                            }
600                            _ => {
601                                // Already exists or too large — emit ProcessTransaction
602                                // as fallback
603                                actions.push(SyncAction::ProcessTransaction(tx));
604                            }
605                        }
606                    }
607                }
608            }
609            NetworkMessage::Addr { addresses } => {
610                actions.extend(self.on_addr(peer_id, addresses).await?);
611            }
612            NetworkMessage::GetAddr => {
613                actions.extend(self.on_getaddr(peer_id).await?);
614            }
615            NetworkMessage::PackageTx { transactions } => {
616                let tx_count = transactions.len();
617                tracing::info!("Received package of {} txs from peer {}", tx_count, peer_id,);
618
619                // Validate package structure at the domain level first
620                match abtc_domain::policy::packages::validate_package(&transactions) {
621                    Ok(pkg_type) => {
622                        tracing::debug!(
623                            "Package from peer {} validated: {:?}, {} txs",
624                            peer_id,
625                            pkg_type,
626                            tx_count,
627                        );
628
629                        // Submit each transaction in topological order to the mempool.
630                        // Package fee evaluation happens at the acceptor level;
631                        // here we do basic per-tx submission.
632                        for tx in &transactions {
633                            let txid = tx.txid();
634                            match mempool.add_transaction(tx).await {
635                                Ok(()) => {
636                                    actions.push(SyncAction::AcceptedTransaction {
637                                        tx: tx.clone(),
638                                        from_peer: peer_id,
639                                    });
640                                }
641                                Err(e) => {
642                                    tracing::debug!(
643                                        "Package tx {} from peer {} rejected: {}",
644                                        txid,
645                                        peer_id,
646                                        e,
647                                    );
648                                }
649                            }
650                        }
651                    }
652                    Err(e) => {
653                        tracing::debug!("Invalid package from peer {}: {}", peer_id, e,);
654                    }
655                }
656            }
657            NetworkMessage::SendPackages { version } => {
658                tracing::info!("Peer {} supports package relay v{}", peer_id, version,);
659                // Record that this peer supports package relay.
660                // Feature negotiation tracking can be extended later.
661            }
662            _ => {
663                // Pong, etc. — ignored
664            }
665        }
666
667        Ok(actions)
668    }
669
670    /// Process received headers — add to block index and continue sync.
671    async fn on_headers(
672        &mut self,
673        peer_id: u64,
674        headers: Vec<BlockHeader>,
675        peer_manager: &dyn PeerManager,
676    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
677        if let Some(state) = self.peer_states.get_mut(&peer_id) {
678            state.headers_sync_pending = false;
679        }
680
681        if headers.is_empty() {
682            // No more headers — peer is caught up
683            tracing::info!("Peer {} has no more headers", peer_id);
684            self.transition_to_block_sync(peer_manager).await?;
685            return Ok(Vec::new());
686        }
687
688        let count = headers.len();
689        tracing::info!("Received {} headers from peer {}", count, peer_id);
690
691        let mut last_hash = None;
692        let mut new_headers = 0;
693        {
694            let mut index = self.block_index.write().await;
695            for header in &headers {
696                match index.add_header(header.clone()) {
697                    Ok((hash, _reorged)) => {
698                        last_hash = Some(hash);
699                        new_headers += 1;
700                    }
701                    Err(e) => {
702                        tracing::warn!("Failed to add header from peer {}: {}", peer_id, e);
703                        // Don't abort — continue with remaining headers
704                    }
705                }
706            }
707        }
708
709        if let Some(hash) = last_hash {
710            if let Some(state) = self.peer_states.get_mut(&peer_id) {
711                state.last_header_received = Some(hash);
712            }
713        }
714
715        tracing::info!(
716            "Added {} new headers (block index height: {})",
717            new_headers,
718            self.block_index.read().await.best_height()
719        );
720
721        // If we received a full batch, request more headers
722        if count >= MAX_HEADERS_PER_REQUEST {
723            self.request_headers_from_peer(peer_id, peer_manager)
724                .await?;
725        } else {
726            // Received fewer than max — this peer is caught up
727            self.transition_to_block_sync(peer_manager).await?;
728        }
729
730        Ok(Vec::new())
731    }
732
733    /// Process a received block — validate and connect to chain.
734    async fn on_block(
735        &mut self,
736        peer_id: u64,
737        block: Block,
738        _block_store: &dyn BlockStore,
739        peer_manager: &dyn PeerManager,
740    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
741        let hash = block.block_hash();
742
743        // Remove from in-flight tracking
744        self.blocks_in_flight.remove(&hash);
745        if let Some(state) = self.peer_states.get_mut(&peer_id) {
746            state.blocks_in_flight.remove(&hash);
747        }
748
749        tracing::debug!("Received block {} from peer {}", hash, peer_id);
750
751        // Check if this is the next block we need
752        let expected_hash = {
753            let index = self.block_index.read().await;
754            index.get_hash_at_height(self.next_block_height)
755        };
756
757        let actions = if expected_hash == Some(hash) {
758            // This is the next block in sequence — process it
759            self.next_block_height += 1;
760
761            // Mark as validated in the index
762            {
763                let mut index = self.block_index.write().await;
764                index.set_status(&hash, BlockValidationStatus::FullyValidated);
765            }
766
767            let mut result = vec![SyncAction::ProcessBlock(block)];
768
769            // Check if any orphan blocks can now be connected
770            while let Some(next_hash) = {
771                let index = self.block_index.read().await;
772                index.get_hash_at_height(self.next_block_height)
773            } {
774                if let Some(orphan) = self.orphan_blocks.remove(&next_hash) {
775                    self.next_block_height += 1;
776                    {
777                        let mut index = self.block_index.write().await;
778                        index.set_status(&next_hash, BlockValidationStatus::FullyValidated);
779                    }
780                    result.push(SyncAction::ProcessBlock(orphan));
781                } else {
782                    break;
783                }
784            }
785
786            result
787        } else {
788            // Out of order — store as orphan
789            self.orphan_blocks.insert(hash, block);
790            Vec::new()
791        };
792
793        // Request more blocks if we have capacity
794        self.request_blocks(peer_manager).await?;
795
796        // Check if sync is complete
797        if self.blocks_to_download.is_empty()
798            && self.blocks_in_flight.is_empty()
799            && self.orphan_blocks.is_empty()
800            && self.state == SyncState::BlockSync
801        {
802            self.state = SyncState::Synced;
803            tracing::info!(
804                "Chain sync complete at height {}",
805                self.next_block_height - 1
806            );
807        }
808
809        Ok(actions)
810    }
811
812    /// Process an inv message — request any blocks/txs we don't have.
813    async fn on_inv(
814        &mut self,
815        peer_id: u64,
816        items: Vec<InventoryItem>,
817        _peer_manager: &dyn PeerManager,
818    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
819        let mut blocks_to_request = Vec::new();
820        let mut txs_to_request = Vec::new();
821
822        for item in items {
823            match item {
824                InventoryItem::Block(hash) => {
825                    let known = self.block_index.read().await.contains(&hash);
826                    if !known && !self.blocks_in_flight.contains(&hash) {
827                        blocks_to_request.push(InventoryItem::Block(hash));
828                    }
829                }
830                InventoryItem::Tx(txid) => {
831                    if !self.recently_seen_txids.contains(&txid) {
832                        txs_to_request.push(InventoryItem::Tx(txid));
833                        self.recently_seen_txids.insert(txid);
834
835                        // Limit the size of the seen set
836                        if self.recently_seen_txids.len() > 50_000 {
837                            self.recently_seen_txids.clear();
838                        }
839                    }
840                }
841            }
842        }
843
844        let mut actions = Vec::new();
845
846        // Request unknown blocks
847        if !blocks_to_request.is_empty() {
848            actions.push(SyncAction::SendMessage(
849                peer_id,
850                NetworkMessage::GetData {
851                    items: blocks_to_request,
852                },
853            ));
854        }
855
856        // Request unknown transactions
857        if !txs_to_request.is_empty() {
858            actions.push(SyncAction::SendMessage(
859                peer_id,
860                NetworkMessage::GetData {
861                    items: txs_to_request,
862                },
863            ));
864        }
865
866        Ok(actions)
867    }
868
869    /// Process a getheaders request from a peer — respond with our headers.
870    async fn on_getheaders(
871        &mut self,
872        peer_id: u64,
873        block_locator: Vec<BlockHash>,
874        hash_stop: BlockHash,
875    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
876        let index = self.block_index.read().await;
877
878        // Find the fork point using the locator
879        let mut start_height = 0u32;
880        for locator_hash in &block_locator {
881            if let Some(entry) = index.get(locator_hash) {
882                start_height = entry.height + 1;
883                break;
884            }
885        }
886
887        // Collect headers from start_height
888        let mut headers = Vec::new();
889        let mut height = start_height;
890        while headers.len() < MAX_HEADERS_PER_REQUEST {
891            if let Some(hash) = index.get_hash_at_height(height) {
892                if let Some(entry) = index.get(&hash) {
893                    headers.push(entry.header.clone());
894                    if hash == hash_stop {
895                        break;
896                    }
897                }
898            } else {
899                break;
900            }
901            height += 1;
902        }
903
904        if !headers.is_empty() {
905            return Ok(vec![SyncAction::SendMessage(
906                peer_id,
907                NetworkMessage::Headers { headers },
908            )]);
909        }
910
911        Ok(Vec::new())
912    }
913
914    /// Process a getblocks request from a peer — respond with inv of block hashes.
915    ///
916    /// The peer provides a block locator (a sparse list of hashes from their chain).
917    /// We find the fork point and send back an `inv` with up to MAX_INV_SIZE
918    /// block hashes starting from there.
919    async fn on_getblocks(
920        &mut self,
921        peer_id: u64,
922        block_locator: Vec<BlockHash>,
923        hash_stop: BlockHash,
924    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
925        let index = self.block_index.read().await;
926
927        // Find the fork point using the locator (same logic as getheaders)
928        let mut start_height = 0u32;
929        for locator_hash in &block_locator {
930            if let Some(entry) = index.get(locator_hash) {
931                start_height = entry.height + 1;
932                break;
933            }
934        }
935
936        // Collect block hashes from start_height up to MAX_INV_SIZE
937        let mut inv_items = Vec::new();
938        let mut height = start_height;
939        while inv_items.len() < MAX_INV_SIZE {
940            if let Some(hash) = index.get_hash_at_height(height) {
941                inv_items.push(InventoryItem::Block(hash));
942                if hash == hash_stop {
943                    break;
944                }
945            } else {
946                break;
947            }
948            height += 1;
949        }
950
951        if !inv_items.is_empty() {
952            tracing::debug!(
953                "Responding to getblocks from peer {} with {} inv items (heights {}..{})",
954                peer_id,
955                inv_items.len(),
956                start_height,
957                height.saturating_sub(1),
958            );
959            return Ok(vec![SyncAction::SendMessage(
960                peer_id,
961                NetworkMessage::Inv { items: inv_items },
962            )]);
963        }
964
965        Ok(Vec::new())
966    }
967
968    /// Process a getdata request from a peer — respond with requested blocks/txs.
969    ///
970    /// For each requested block hash, look it up in the block store and send
971    /// it back. For each requested txid, look it up in the mempool (if
972    /// available) and send it back. Unknown items are silently skipped.
973    async fn on_getdata(
974        &mut self,
975        peer_id: u64,
976        items: Vec<InventoryItem>,
977        block_store: &dyn BlockStore,
978    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
979        let mut actions = Vec::new();
980
981        for item in items {
982            match item {
983                InventoryItem::Block(hash) => match block_store.get_block(&hash).await {
984                    Ok(Some(block)) => {
985                        tracing::debug!("Serving block {} to peer {}", hash, peer_id,);
986                        actions.push(SyncAction::SendMessage(
987                            peer_id,
988                            NetworkMessage::Block { block },
989                        ));
990                    }
991                    Ok(None) => {
992                        tracing::debug!("Peer {} requested unknown block {}", peer_id, hash,);
993                    }
994                    Err(e) => {
995                        tracing::warn!("Error fetching block {} for peer {}: {}", hash, peer_id, e,);
996                    }
997                },
998                InventoryItem::Tx(_txid) => {
999                    // Transaction serving from mempool is handled by
1000                    // on_getdata_with_mempool below.
1001                }
1002            }
1003        }
1004
1005        Ok(actions)
1006    }
1007
1008    /// Process a getdata request that may include transactions.
1009    ///
1010    /// This is the full version that also serves transactions from the mempool.
1011    pub async fn on_getdata_with_mempool(
1012        &mut self,
1013        peer_id: u64,
1014        items: Vec<InventoryItem>,
1015        block_store: &dyn BlockStore,
1016        mempool: &dyn MempoolPort,
1017    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
1018        let mut actions = Vec::new();
1019
1020        for item in items {
1021            match item {
1022                InventoryItem::Block(hash) => {
1023                    if let Ok(Some(block)) = block_store.get_block(&hash).await {
1024                        actions.push(SyncAction::SendMessage(
1025                            peer_id,
1026                            NetworkMessage::Block { block },
1027                        ));
1028                    }
1029                }
1030                InventoryItem::Tx(txid) => {
1031                    if let Ok(Some(entry)) = mempool.get_transaction(&txid).await {
1032                        tracing::debug!("Serving transaction {} to peer {}", txid, peer_id,);
1033                        actions.push(SyncAction::SendMessage(
1034                            peer_id,
1035                            NetworkMessage::Tx { tx: entry.tx },
1036                        ));
1037                    }
1038                }
1039            }
1040        }
1041
1042        Ok(actions)
1043    }
1044
1045    /// Announce a newly connected block to all connected peers.
1046    ///
1047    /// Sends an `inv` message with the block hash to every peer except
1048    /// the one that sent it to us (if known).
1049    pub async fn announce_block(
1050        &self,
1051        block_hash: BlockHash,
1052        from_peer: Option<u64>,
1053        peer_manager: &dyn PeerManager,
1054    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1055        let inv = NetworkMessage::Inv {
1056            items: vec![InventoryItem::Block(block_hash)],
1057        };
1058
1059        for &peer_id in self.peer_states.keys() {
1060            // Don't announce back to the peer that sent us the block
1061            if Some(peer_id) == from_peer {
1062                continue;
1063            }
1064            let _ = peer_manager.send_to_peer(peer_id, inv.clone()).await;
1065        }
1066
1067        Ok(())
1068    }
1069
1070    /// Announce a new transaction to all connected peers.
1071    ///
1072    /// Sends an `inv` message with the txid to every peer except
1073    /// the one that sent it to us (if known). This is called when a
1074    /// transaction is accepted into the mempool.
1075    pub async fn announce_transaction(
1076        &self,
1077        txid: abtc_domain::primitives::Txid,
1078        from_peer: Option<u64>,
1079        peer_manager: &dyn PeerManager,
1080    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1081        let inv = NetworkMessage::Inv {
1082            items: vec![InventoryItem::Tx(txid)],
1083        };
1084
1085        for &peer_id in self.peer_states.keys() {
1086            if Some(peer_id) == from_peer {
1087                continue;
1088            }
1089            let _ = peer_manager.send_to_peer(peer_id, inv.clone()).await;
1090        }
1091
1092        Ok(())
1093    }
1094
1095    // ── Address management ────────────────────────────────────────
1096
1097    /// Process an `addr` message — store new peer addresses.
1098    async fn on_addr(
1099        &mut self,
1100        peer_id: u64,
1101        addresses: Vec<(u32, SocketAddr)>,
1102    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
1103        let now = std::time::SystemTime::now()
1104            .duration_since(std::time::UNIX_EPOCH)
1105            .unwrap_or_default()
1106            .as_secs() as u32;
1107
1108        let mut added = 0usize;
1109        for (timestamp, addr) in &addresses {
1110            // Skip addresses that are too old.
1111            if now.saturating_sub(*timestamp) > MAX_ADDR_AGE {
1112                continue;
1113            }
1114            // Skip if we already know a more recent entry for this address.
1115            if let Some((existing_ts, _)) = self.known_addresses.get(addr) {
1116                if *existing_ts >= *timestamp {
1117                    continue;
1118                }
1119            }
1120            self.known_addresses
1121                .insert(*addr, (*timestamp, OUR_SERVICES));
1122            added += 1;
1123        }
1124
1125        // Evict oldest entries if we exceed the limit.
1126        while self.known_addresses.len() > MAX_KNOWN_ADDRESSES {
1127            // Find the oldest entry.
1128            if let Some(oldest_addr) = self
1129                .known_addresses
1130                .iter()
1131                .min_by_key(|(_, (ts, _))| *ts)
1132                .map(|(addr, _)| *addr)
1133            {
1134                self.known_addresses.remove(&oldest_addr);
1135            } else {
1136                break;
1137            }
1138        }
1139
1140        if added > 0 {
1141            tracing::debug!(
1142                "Learned {} new addresses from peer {} (total known: {})",
1143                added,
1144                peer_id,
1145                self.known_addresses.len(),
1146            );
1147        }
1148
1149        Ok(Vec::new())
1150    }
1151
1152    /// Process a `getaddr` message — respond with our known addresses.
1153    async fn on_getaddr(
1154        &self,
1155        peer_id: u64,
1156    ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
1157        let now = std::time::SystemTime::now()
1158            .duration_since(std::time::UNIX_EPOCH)
1159            .unwrap_or_default()
1160            .as_secs() as u32;
1161
1162        // Collect recent addresses (not too old), up to MAX_ADDR_TO_SEND.
1163        let mut addrs: Vec<(u32, SocketAddr)> = self
1164            .known_addresses
1165            .iter()
1166            .filter(|(_, (ts, _))| now.saturating_sub(*ts) <= MAX_ADDR_AGE)
1167            .map(|(addr, (ts, _))| (*ts, *addr))
1168            .collect();
1169
1170        // Sort by timestamp descending (most recent first) and truncate.
1171        addrs.sort_by(|a, b| b.0.cmp(&a.0));
1172        addrs.truncate(MAX_ADDR_TO_SEND);
1173
1174        if addrs.is_empty() {
1175            return Ok(Vec::new());
1176        }
1177
1178        tracing::debug!(
1179            "Sending {} addresses to peer {} in response to getaddr",
1180            addrs.len(),
1181            peer_id,
1182        );
1183
1184        Ok(vec![SyncAction::SendMessage(
1185            peer_id,
1186            NetworkMessage::Addr { addresses: addrs },
1187        )])
1188    }
1189
1190    /// Get the number of known peer addresses.
1191    pub fn known_address_count(&self) -> usize {
1192        self.known_addresses.len()
1193    }
1194
1195    /// Check whether a given address is currently banned.
1196    pub fn is_banned(&self, addr: &SocketAddr) -> bool {
1197        let now = std::time::SystemTime::now()
1198            .duration_since(std::time::UNIX_EPOCH)
1199            .map(|d| d.as_secs())
1200            .unwrap_or(0);
1201        self.peer_scoring.is_banned(addr, now)
1202    }
1203
1204    /// Get the ban score for a connected peer.
1205    pub fn peer_ban_score(&self, peer_id: u64) -> i32 {
1206        self.peer_scoring.get_score(peer_id)
1207    }
1208
1209    /// Get the number of orphan transactions currently held.
1210    pub fn orphan_tx_count(&self) -> usize {
1211        self.orphan_tx_pool.len()
1212    }
1213
1214    /// Send a getheaders message to a peer using our current block locator.
1215    async fn request_headers_from_peer(
1216        &mut self,
1217        peer_id: u64,
1218        peer_manager: &dyn PeerManager,
1219    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1220        if let Some(state) = self.peer_states.get_mut(&peer_id) {
1221            if state.headers_sync_pending {
1222                return Ok(()); // Already waiting
1223            }
1224            state.headers_sync_pending = true;
1225        }
1226
1227        let locator = self.block_index.read().await.build_locator();
1228
1229        let msg = NetworkMessage::GetHeaders {
1230            version: 70016,
1231            block_locator: locator,
1232            hash_stop: BlockHash::zero(), // Get as many as possible
1233        };
1234
1235        peer_manager.send_to_peer(peer_id, msg).await?;
1236        tracing::debug!("Sent getheaders to peer {}", peer_id);
1237        Ok(())
1238    }
1239
1240    /// Transition from header sync to block sync.
1241    async fn transition_to_block_sync(
1242        &mut self,
1243        peer_manager: &dyn PeerManager,
1244    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1245        if self.state != SyncState::HeaderSync {
1246            return Ok(());
1247        }
1248
1249        let index = self.block_index.read().await;
1250        let best_height = index.best_height();
1251
1252        // Queue all blocks from our current position to the tip
1253        self.blocks_to_download.clear();
1254        for height in self.next_block_height..=best_height {
1255            if let Some(hash) = index.get_hash_at_height(height) {
1256                self.blocks_to_download.push_back(hash);
1257            }
1258        }
1259
1260        drop(index);
1261
1262        let count = self.blocks_to_download.len();
1263        if count == 0 {
1264            self.state = SyncState::Synced;
1265            tracing::info!("Already synced — no blocks to download");
1266        } else {
1267            self.state = SyncState::BlockSync;
1268            tracing::info!("Transitioning to block sync: {} blocks to download", count);
1269            self.request_blocks(peer_manager).await?;
1270        }
1271
1272        Ok(())
1273    }
1274
1275    /// Request blocks from peers, up to MAX_BLOCKS_IN_FLIGHT.
1276    async fn request_blocks(
1277        &mut self,
1278        peer_manager: &dyn PeerManager,
1279    ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1280        // Distribute block requests across peers round-robin
1281        let peer_ids: Vec<u64> = self.peer_states.keys().copied().collect();
1282        if peer_ids.is_empty() {
1283            return Ok(());
1284        }
1285
1286        let mut peer_idx = 0;
1287
1288        while self.blocks_in_flight.len() < MAX_BLOCKS_IN_FLIGHT {
1289            let hash = match self.blocks_to_download.pop_front() {
1290                Some(h) => h,
1291                None => break,
1292            };
1293
1294            let peer_id = peer_ids[peer_idx % peer_ids.len()];
1295            peer_idx += 1;
1296
1297            self.blocks_in_flight.insert(hash);
1298            if let Some(state) = self.peer_states.get_mut(&peer_id) {
1299                state.blocks_in_flight.insert(hash);
1300            }
1301
1302            let msg = NetworkMessage::GetData {
1303                items: vec![InventoryItem::Block(hash)],
1304            };
1305            let _ = peer_manager.send_to_peer(peer_id, msg).await;
1306        }
1307
1308        Ok(())
1309    }
1310}
1311
1312/// Actions that the infrastructure layer should take after processing events.
1313#[derive(Debug)]
1314pub enum SyncAction {
1315    /// Store and validate this block
1316    ProcessBlock(Block),
1317    /// Submit this transaction to the mempool (fallback — mempool rejected it)
1318    ProcessTransaction(Transaction),
1319    /// Transaction was accepted into the mempool and should be relayed to peers
1320    AcceptedTransaction {
1321        /// The accepted transaction
1322        tx: Transaction,
1323        /// The peer that sent it (skip when relaying)
1324        from_peer: u64,
1325    },
1326    /// Send a message to a specific peer
1327    SendMessage(u64, NetworkMessage),
1328    /// Disconnect a peer
1329    DisconnectPeer(u64),
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334    use super::*;
1335    use abtc_domain::primitives::Hash256;
1336
1337    fn make_header(prev: BlockHash, nonce: u32) -> BlockHeader {
1338        BlockHeader {
1339            version: 1,
1340            prev_block_hash: prev,
1341            merkle_root: Hash256::from_bytes([nonce as u8; 32]),
1342            time: 1231006505 + nonce,
1343            bits: 0x1d00ffff,
1344            nonce,
1345        }
1346    }
1347
1348    #[tokio::test]
1349    async fn test_sync_manager_creation() {
1350        let mut index = BlockIndex::new();
1351        let genesis = make_header(BlockHash::zero(), 0);
1352        index.init_genesis(genesis);
1353
1354        let index = Arc::new(RwLock::new(index));
1355        let manager = SyncManager::new(index);
1356
1357        assert_eq!(manager.state(), SyncState::Idle);
1358        assert_eq!(manager.blocks_remaining(), 0);
1359    }
1360
1361    #[tokio::test]
1362    async fn test_sync_state_transitions() {
1363        let mut index = BlockIndex::new();
1364        let genesis = make_header(BlockHash::zero(), 0);
1365        index.init_genesis(genesis);
1366
1367        let index = Arc::new(RwLock::new(index));
1368        let mut manager = SyncManager::new(index);
1369
1370        // Starts idle
1371        assert_eq!(manager.state(), SyncState::Idle);
1372
1373        // After connecting a peer with high version, should move to HeaderSync
1374        let peer_info = PeerInfo {
1375            id: 1,
1376            addr: "127.0.0.1:8333".parse().unwrap(),
1377            services: 1,
1378            version: 70016,
1379            subver: "/test/".to_string(),
1380            start_height: 100,
1381            relay_txs: true,
1382        };
1383
1384        // We can't fully test with a real PeerManager, but verify state tracking
1385        manager.peer_states.insert(
1386            1,
1387            PeerSyncState {
1388                info: peer_info,
1389                handshake: HandshakeState::Complete,
1390                headers_sync_pending: false,
1391                blocks_in_flight: HashSet::new(),
1392                last_header_received: None,
1393            },
1394        );
1395        manager.state = SyncState::HeaderSync;
1396
1397        assert_eq!(manager.state(), SyncState::HeaderSync);
1398    }
1399
1400    #[tokio::test]
1401    async fn test_peer_disconnect_reassigns_blocks() {
1402        let mut index = BlockIndex::new();
1403        let genesis = make_header(BlockHash::zero(), 0);
1404        let genesis_hash = genesis.block_hash();
1405        index.init_genesis(genesis);
1406
1407        let h1 = make_header(genesis_hash, 1);
1408        let (h1_hash, _) = index.add_header(h1).unwrap();
1409
1410        let index = Arc::new(RwLock::new(index));
1411        let mut manager = SyncManager::new(index);
1412
1413        // Simulate a peer with a block in flight
1414        let mut in_flight = HashSet::new();
1415        in_flight.insert(h1_hash);
1416        manager.blocks_in_flight.insert(h1_hash);
1417
1418        manager.peer_states.insert(
1419            1,
1420            PeerSyncState {
1421                info: PeerInfo {
1422                    id: 1,
1423                    addr: "127.0.0.1:8333".parse().unwrap(),
1424                    services: 1,
1425                    version: 70016,
1426                    subver: "/test/".to_string(),
1427                    start_height: 1,
1428                    relay_txs: true,
1429                },
1430                handshake: HandshakeState::Complete,
1431                headers_sync_pending: false,
1432                blocks_in_flight: in_flight,
1433                last_header_received: None,
1434            },
1435        );
1436
1437        // Disconnect peer — block should be moved back to download queue
1438        manager.on_peer_disconnected(1);
1439
1440        assert!(manager.blocks_in_flight.is_empty());
1441        assert_eq!(manager.blocks_to_download.len(), 1);
1442        assert_eq!(manager.blocks_to_download[0], h1_hash);
1443    }
1444
1445    fn build_chain(count: u32) -> (BlockIndex, Vec<BlockHash>) {
1446        let mut index = BlockIndex::new();
1447        let genesis = make_header(BlockHash::zero(), 0);
1448        let genesis_hash = genesis.block_hash();
1449        index.init_genesis(genesis);
1450
1451        let mut hashes = vec![genesis_hash];
1452        let mut prev = genesis_hash;
1453        for i in 1..=count {
1454            let h = make_header(prev, i);
1455            let (hash, _) = index.add_header(h).unwrap();
1456            hashes.push(hash);
1457            prev = hash;
1458        }
1459        (index, hashes)
1460    }
1461
1462    fn make_peer_state(id: u64) -> (u64, PeerSyncState) {
1463        (
1464            id,
1465            PeerSyncState {
1466                info: PeerInfo {
1467                    id,
1468                    addr: "127.0.0.1:8333".parse().unwrap(),
1469                    services: 1,
1470                    version: 70016,
1471                    subver: "/test/".to_string(),
1472                    start_height: 100,
1473                    relay_txs: true,
1474                },
1475                handshake: HandshakeState::Complete,
1476                headers_sync_pending: false,
1477                blocks_in_flight: HashSet::new(),
1478                last_header_received: None,
1479            },
1480        )
1481    }
1482
1483    #[tokio::test]
1484    async fn test_on_getblocks_responds_with_inv() {
1485        let (index, hashes) = build_chain(5);
1486        let genesis_hash = hashes[0];
1487        let index = Arc::new(RwLock::new(index));
1488        let mut manager = SyncManager::new(index);
1489
1490        // Peer sends getblocks with genesis as locator → should get hashes 1..5
1491        let actions = manager
1492            .on_getblocks(1, vec![genesis_hash], BlockHash::zero())
1493            .await
1494            .unwrap();
1495
1496        assert_eq!(actions.len(), 1);
1497        match &actions[0] {
1498            SyncAction::SendMessage(peer_id, NetworkMessage::Inv { items }) => {
1499                assert_eq!(*peer_id, 1);
1500                assert_eq!(items.len(), 5); // blocks 1,2,3,4,5
1501                                            // First item should be block at height 1
1502                match &items[0] {
1503                    InventoryItem::Block(h) => assert_eq!(*h, hashes[1]),
1504                    _ => panic!("Expected block inv item"),
1505                }
1506            }
1507            other => panic!("Expected SendMessage(Inv), got {:?}", other),
1508        }
1509    }
1510
1511    #[tokio::test]
1512    async fn test_on_getblocks_with_hash_stop() {
1513        let (index, hashes) = build_chain(5);
1514        let genesis_hash = hashes[0];
1515        let index = Arc::new(RwLock::new(index));
1516        let mut manager = SyncManager::new(index);
1517
1518        // Request blocks from genesis but stop at height 3
1519        let actions = manager
1520            .on_getblocks(1, vec![genesis_hash], hashes[3])
1521            .await
1522            .unwrap();
1523
1524        assert_eq!(actions.len(), 1);
1525        match &actions[0] {
1526            SyncAction::SendMessage(_, NetworkMessage::Inv { items }) => {
1527                assert_eq!(items.len(), 3); // blocks 1,2,3
1528            }
1529            other => panic!("Expected SendMessage(Inv), got {:?}", other),
1530        }
1531    }
1532
1533    #[tokio::test]
1534    async fn test_on_getblocks_empty_when_caught_up() {
1535        let (index, hashes) = build_chain(3);
1536        let tip = *hashes.last().unwrap();
1537        let index = Arc::new(RwLock::new(index));
1538        let mut manager = SyncManager::new(index);
1539
1540        // Peer's locator includes the tip → nothing to send
1541        let actions = manager
1542            .on_getblocks(1, vec![tip], BlockHash::zero())
1543            .await
1544            .unwrap();
1545
1546        assert!(actions.is_empty());
1547    }
1548
1549    #[tokio::test]
1550    async fn test_on_getblocks_from_middle() {
1551        let (index, hashes) = build_chain(5);
1552        let index = Arc::new(RwLock::new(index));
1553        let mut manager = SyncManager::new(index);
1554
1555        // Peer has up to height 2, sends that as locator
1556        let actions = manager
1557            .on_getblocks(1, vec![hashes[2]], BlockHash::zero())
1558            .await
1559            .unwrap();
1560
1561        assert_eq!(actions.len(), 1);
1562        match &actions[0] {
1563            SyncAction::SendMessage(_, NetworkMessage::Inv { items }) => {
1564                assert_eq!(items.len(), 3); // blocks 3,4,5
1565                match &items[0] {
1566                    InventoryItem::Block(h) => assert_eq!(*h, hashes[3]),
1567                    _ => panic!("Expected block inv item"),
1568                }
1569            }
1570            other => panic!("Expected SendMessage(Inv), got {:?}", other),
1571        }
1572    }
1573
1574    #[tokio::test]
1575    async fn test_on_getdata_serves_known_block() {
1576        let (index, hashes) = build_chain(3);
1577        let index = Arc::new(RwLock::new(index));
1578        let mut manager = SyncManager::new(index);
1579
1580        // Create a mock block store that has block at height 1
1581        let block_store = MockBlockStore {
1582            blocks: {
1583                let mut m = HashMap::new();
1584                // Create a minimal block for hash[1]
1585                let block = Block {
1586                    header: make_header(hashes[0], 1),
1587                    transactions: Vec::new(),
1588                };
1589                m.insert(block.block_hash(), block);
1590                m
1591            },
1592        };
1593
1594        let actions = manager
1595            .on_getdata(1, vec![InventoryItem::Block(hashes[1])], &block_store)
1596            .await
1597            .unwrap();
1598
1599        assert_eq!(actions.len(), 1);
1600        match &actions[0] {
1601            SyncAction::SendMessage(peer_id, NetworkMessage::Block { block }) => {
1602                assert_eq!(*peer_id, 1);
1603                assert_eq!(block.block_hash(), hashes[1]);
1604            }
1605            other => panic!("Expected SendMessage(Block), got {:?}", other),
1606        }
1607    }
1608
1609    #[tokio::test]
1610    async fn test_on_getdata_unknown_block_skipped() {
1611        let (index, _hashes) = build_chain(1);
1612        let index = Arc::new(RwLock::new(index));
1613        let mut manager = SyncManager::new(index);
1614
1615        let block_store = MockBlockStore {
1616            blocks: HashMap::new(), // empty
1617        };
1618
1619        let unknown_hash = BlockHash::from_hash(Hash256::from_bytes([0xFF; 32]));
1620        let actions = manager
1621            .on_getdata(1, vec![InventoryItem::Block(unknown_hash)], &block_store)
1622            .await
1623            .unwrap();
1624
1625        assert!(actions.is_empty());
1626    }
1627
1628    #[tokio::test]
1629    async fn test_announce_block_to_peers() {
1630        let (index, _hashes) = build_chain(1);
1631        let index = Arc::new(RwLock::new(index));
1632        let manager = SyncManager::new(index);
1633
1634        // Need peers to announce to — use stub
1635        let stub = abtc_adapters::network::StubPeerManager::new();
1636        let block_hash = BlockHash::from_hash(Hash256::from_bytes([0x42; 32]));
1637
1638        // No peers → should succeed silently
1639        manager
1640            .announce_block(block_hash, None, &stub)
1641            .await
1642            .unwrap();
1643    }
1644
1645    #[tokio::test]
1646    async fn test_announce_block_skips_sender() {
1647        let (index, _hashes) = build_chain(1);
1648        let index = Arc::new(RwLock::new(index));
1649        let mut manager = SyncManager::new(index);
1650
1651        // Add two peers
1652        let (id1, state1) = make_peer_state(1);
1653        let (id2, state2) = make_peer_state(2);
1654        manager.peer_states.insert(id1, state1);
1655        manager.peer_states.insert(id2, state2);
1656
1657        let stub = abtc_adapters::network::StubPeerManager::new();
1658        let block_hash = BlockHash::from_hash(Hash256::from_bytes([0x42; 32]));
1659
1660        // Announce from peer 1 → should only send to peer 2
1661        // (We can't easily verify with StubPeerManager, but at least
1662        // verify it doesn't crash)
1663        manager
1664            .announce_block(block_hash, Some(1), &stub)
1665            .await
1666            .unwrap();
1667    }
1668
1669    /// Minimal mock block store for testing getdata
1670    struct MockBlockStore {
1671        blocks: HashMap<BlockHash, Block>,
1672    }
1673
1674    #[async_trait::async_trait]
1675    impl BlockStore for MockBlockStore {
1676        async fn store_block(
1677            &self,
1678            _block: &Block,
1679            _height: u32,
1680        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1681            Ok(())
1682        }
1683
1684        async fn get_block(
1685            &self,
1686            hash: &BlockHash,
1687        ) -> Result<Option<Block>, Box<dyn std::error::Error + Send + Sync>> {
1688            Ok(self.blocks.get(hash).cloned())
1689        }
1690
1691        async fn get_block_header(
1692            &self,
1693            _hash: &BlockHash,
1694        ) -> Result<Option<BlockHeader>, Box<dyn std::error::Error + Send + Sync>> {
1695            Ok(None)
1696        }
1697
1698        async fn has_block(
1699            &self,
1700            hash: &BlockHash,
1701        ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
1702            Ok(self.blocks.contains_key(hash))
1703        }
1704
1705        async fn get_best_block_hash(
1706            &self,
1707        ) -> Result<BlockHash, Box<dyn std::error::Error + Send + Sync>> {
1708            Ok(BlockHash::zero())
1709        }
1710
1711        async fn get_block_height(
1712            &self,
1713            _hash: &BlockHash,
1714        ) -> Result<Option<u32>, Box<dyn std::error::Error + Send + Sync>> {
1715            Ok(None)
1716        }
1717    }
1718
1719    // ── Transaction relay tests ────────────────────────────────
1720
1721    /// Minimal mock mempool for testing tx relay.
1722    struct MockMempool {
1723        txs: HashMap<abtc_domain::primitives::Txid, abtc_ports::MempoolEntry>,
1724    }
1725
1726    #[async_trait::async_trait]
1727    impl MempoolPort for MockMempool {
1728        async fn add_transaction(
1729            &self,
1730            _tx: &Transaction,
1731        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1732            Ok(())
1733        }
1734        async fn remove_transaction(
1735            &self,
1736            _txid: &abtc_domain::primitives::Txid,
1737            _recursive: bool,
1738        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1739            Ok(())
1740        }
1741        async fn get_transaction(
1742            &self,
1743            txid: &abtc_domain::primitives::Txid,
1744        ) -> Result<Option<abtc_ports::MempoolEntry>, Box<dyn std::error::Error + Send + Sync>>
1745        {
1746            Ok(self.txs.get(txid).cloned())
1747        }
1748        async fn get_all_transactions(
1749            &self,
1750        ) -> Result<Vec<abtc_ports::MempoolEntry>, Box<dyn std::error::Error + Send + Sync>>
1751        {
1752            Ok(self.txs.values().cloned().collect())
1753        }
1754        async fn get_transaction_count(
1755            &self,
1756        ) -> Result<u32, Box<dyn std::error::Error + Send + Sync>> {
1757            Ok(self.txs.len() as u32)
1758        }
1759        async fn estimate_fee(
1760            &self,
1761            _target_blocks: u32,
1762        ) -> Result<f64, Box<dyn std::error::Error + Send + Sync>> {
1763            Ok(1.0)
1764        }
1765        async fn get_mempool_info(
1766            &self,
1767        ) -> Result<abtc_ports::MempoolInfo, Box<dyn std::error::Error + Send + Sync>> {
1768            Ok(abtc_ports::MempoolInfo {
1769                size: self.txs.len() as u32,
1770                bytes: 0,
1771                usage: 0,
1772                max_mempool: 300_000_000,
1773                min_relay_fee: 0.00001,
1774            })
1775        }
1776        async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1777            Ok(())
1778        }
1779    }
1780
1781    fn make_test_tx(value: i64) -> Transaction {
1782        use abtc_domain::primitives::{Amount, OutPoint, TxIn, TxOut, Txid};
1783        use abtc_domain::script::Script;
1784        let input = TxIn::final_input(OutPoint::new(Txid::zero(), 0), Script::new());
1785        let output = TxOut::new(Amount::from_sat(value), Script::new());
1786        Transaction::v1(vec![input], vec![output], 0)
1787    }
1788
1789    #[tokio::test]
1790    async fn test_announce_transaction_to_peers() {
1791        let (index, _hashes) = build_chain(1);
1792        let index = Arc::new(RwLock::new(index));
1793        let manager = SyncManager::new(index);
1794
1795        let stub = abtc_adapters::network::StubPeerManager::new();
1796        let txid = abtc_domain::primitives::Txid::zero();
1797
1798        // No peers → should succeed silently
1799        manager
1800            .announce_transaction(txid, None, &stub)
1801            .await
1802            .unwrap();
1803    }
1804
1805    #[tokio::test]
1806    async fn test_announce_transaction_skips_sender() {
1807        let (index, _hashes) = build_chain(1);
1808        let index = Arc::new(RwLock::new(index));
1809        let mut manager = SyncManager::new(index);
1810
1811        let (id1, state1) = make_peer_state(1);
1812        let (id2, state2) = make_peer_state(2);
1813        manager.peer_states.insert(id1, state1);
1814        manager.peer_states.insert(id2, state2);
1815
1816        let stub = abtc_adapters::network::StubPeerManager::new();
1817        let txid = abtc_domain::primitives::Txid::zero();
1818
1819        // Announce from peer 1 → should skip peer 1, send to peer 2
1820        manager
1821            .announce_transaction(txid, Some(1), &stub)
1822            .await
1823            .unwrap();
1824    }
1825
1826    #[tokio::test]
1827    async fn test_on_getdata_serves_transaction() {
1828        let (index, _hashes) = build_chain(1);
1829        let index = Arc::new(RwLock::new(index));
1830        let mut manager = SyncManager::new(index);
1831
1832        let tx = make_test_tx(5000);
1833        let txid = tx.txid();
1834
1835        let entry = abtc_ports::MempoolEntry {
1836            tx: tx.clone(),
1837            fee: abtc_domain::primitives::Amount::from_sat(100),
1838            size: 200,
1839            time: 0,
1840            height: 0,
1841            descendant_count: 0,
1842            descendant_size: 0,
1843            ancestor_count: 0,
1844            ancestor_size: 0,
1845        };
1846
1847        let mut txs = HashMap::new();
1848        txs.insert(txid, entry);
1849        let mempool = MockMempool { txs };
1850        let block_store = MockBlockStore {
1851            blocks: HashMap::new(),
1852        };
1853
1854        let actions = manager
1855            .on_getdata_with_mempool(1, vec![InventoryItem::Tx(txid)], &block_store, &mempool)
1856            .await
1857            .unwrap();
1858
1859        assert_eq!(actions.len(), 1);
1860        match &actions[0] {
1861            SyncAction::SendMessage(peer_id, NetworkMessage::Tx { tx: served_tx }) => {
1862                assert_eq!(*peer_id, 1);
1863                assert_eq!(served_tx.txid(), txid);
1864            }
1865            other => panic!("Expected SendMessage(Tx), got {:?}", other),
1866        }
1867    }
1868
1869    #[tokio::test]
1870    async fn test_on_getdata_unknown_tx_skipped() {
1871        let (index, _hashes) = build_chain(1);
1872        let index = Arc::new(RwLock::new(index));
1873        let mut manager = SyncManager::new(index);
1874
1875        let mempool = MockMempool {
1876            txs: HashMap::new(),
1877        };
1878        let block_store = MockBlockStore {
1879            blocks: HashMap::new(),
1880        };
1881
1882        let unknown_txid = abtc_domain::primitives::Txid::zero();
1883        let actions = manager
1884            .on_getdata_with_mempool(
1885                1,
1886                vec![InventoryItem::Tx(unknown_txid)],
1887                &block_store,
1888                &mempool,
1889            )
1890            .await
1891            .unwrap();
1892
1893        assert!(actions.is_empty());
1894    }
1895
1896    #[tokio::test]
1897    async fn test_on_inv_requests_unknown_tx() {
1898        let (index, _hashes) = build_chain(1);
1899        let index = Arc::new(RwLock::new(index));
1900        let mut manager = SyncManager::new(index);
1901
1902        let stub = abtc_adapters::network::StubPeerManager::new();
1903        let txid = abtc_domain::primitives::Txid::from_hash(Hash256::from_bytes([0xAB; 32]));
1904
1905        let actions = manager
1906            .on_inv(1, vec![InventoryItem::Tx(txid)], &stub)
1907            .await
1908            .unwrap();
1909
1910        // Should have sent a GetData request for the unknown tx
1911        assert!(!actions.is_empty());
1912    }
1913
1914    #[tokio::test]
1915    async fn test_on_inv_deduplicates_seen_tx() {
1916        let (index, _hashes) = build_chain(1);
1917        let index = Arc::new(RwLock::new(index));
1918        let mut manager = SyncManager::new(index);
1919
1920        let stub = abtc_adapters::network::StubPeerManager::new();
1921        let txid = abtc_domain::primitives::Txid::from_hash(Hash256::from_bytes([0xCD; 32]));
1922
1923        // First inv → should request
1924        let actions1 = manager
1925            .on_inv(1, vec![InventoryItem::Tx(txid)], &stub)
1926            .await
1927            .unwrap();
1928        assert!(!actions1.is_empty());
1929
1930        // Second inv (same txid) → should NOT request again
1931        let actions2 = manager
1932            .on_inv(1, vec![InventoryItem::Tx(txid)], &stub)
1933            .await
1934            .unwrap();
1935        // The txid is now in recently_seen, so no getdata for it
1936        let has_tx_request = actions2.iter().any(|a| {
1937            matches!(a, SyncAction::SendMessage(_, NetworkMessage::GetData { items })
1938                if items.iter().any(|i| matches!(i, InventoryItem::Tx(t) if *t == txid)))
1939        });
1940        assert!(!has_tx_request, "Should not re-request already-seen txid");
1941    }
1942
1943    // ── Handshake tests ──────────────────────────────────────
1944
1945    #[tokio::test]
1946    async fn test_handshake_sends_version_on_connect() {
1947        let (index, _) = build_chain(1);
1948        let index = Arc::new(RwLock::new(index));
1949        let mut manager = SyncManager::new(index);
1950
1951        let stub = abtc_adapters::network::StubPeerManager::new();
1952        let peer_info = PeerInfo {
1953            id: 1,
1954            addr: "127.0.0.1:8333".parse().unwrap(),
1955            services: 1,
1956            version: 70016,
1957            subver: "/test/".to_string(),
1958            start_height: 0,
1959            relay_txs: true,
1960        };
1961
1962        let actions = manager.on_peer_connected(peer_info, &stub).await.unwrap();
1963
1964        // Should send a Version message
1965        assert_eq!(actions.len(), 1);
1966        assert!(matches!(
1967            &actions[0],
1968            SyncAction::SendMessage(1, NetworkMessage::Version { .. })
1969        ));
1970
1971        // Peer should be in AwaitingVersion state
1972        assert_eq!(
1973            manager.peer_states.get(&1).unwrap().handshake,
1974            HandshakeState::AwaitingVersion
1975        );
1976    }
1977
1978    #[tokio::test]
1979    async fn test_handshake_version_then_verack() {
1980        let (index, _) = build_chain(1);
1981        let index = Arc::new(RwLock::new(index));
1982        let mut manager = SyncManager::new(index);
1983
1984        let stub = abtc_adapters::network::StubPeerManager::new();
1985        let peer_info = PeerInfo {
1986            id: 1,
1987            addr: "127.0.0.1:8333".parse().unwrap(),
1988            services: 1,
1989            version: 70016,
1990            subver: "/test/".to_string(),
1991            start_height: 100,
1992            relay_txs: true,
1993        };
1994
1995        // Step 1: Connect → sends our Version
1996        let _ = manager.on_peer_connected(peer_info, &stub).await.unwrap();
1997        assert_eq!(
1998            manager.peer_states.get(&1).unwrap().handshake,
1999            HandshakeState::AwaitingVersion
2000        );
2001
2002        // Step 2: Receive their Version → sends Verack, moves to AwaitingVerack
2003        let actions = manager
2004            .on_version(1, 70016, 1, "/remote/".to_string(), 200, true, &stub)
2005            .await
2006            .unwrap();
2007
2008        assert_eq!(actions.len(), 1);
2009        assert!(matches!(
2010            &actions[0],
2011            SyncAction::SendMessage(1, NetworkMessage::Verack)
2012        ));
2013        assert_eq!(
2014            manager.peer_states.get(&1).unwrap().handshake,
2015            HandshakeState::AwaitingVerack
2016        );
2017
2018        // Verify peer info was updated
2019        assert_eq!(manager.peer_states.get(&1).unwrap().info.start_height, 200);
2020        assert_eq!(manager.peer_states.get(&1).unwrap().info.subver, "/remote/");
2021
2022        // Step 3: Receive Verack → handshake complete
2023        let _ = manager.on_verack(1, &stub).await.unwrap();
2024        assert_eq!(
2025            manager.peer_states.get(&1).unwrap().handshake,
2026            HandshakeState::Complete
2027        );
2028        // Should have started header sync
2029        assert_eq!(manager.state(), SyncState::HeaderSync);
2030    }
2031
2032    #[tokio::test]
2033    async fn test_handshake_rejects_old_version() {
2034        let (index, _) = build_chain(1);
2035        let index = Arc::new(RwLock::new(index));
2036        let mut manager = SyncManager::new(index);
2037
2038        let stub = abtc_adapters::network::StubPeerManager::new();
2039        let peer_info = PeerInfo {
2040            id: 1,
2041            addr: "127.0.0.1:8333".parse().unwrap(),
2042            services: 1,
2043            version: 70016,
2044            subver: "/test/".to_string(),
2045            start_height: 0,
2046            relay_txs: true,
2047        };
2048
2049        let _ = manager.on_peer_connected(peer_info, &stub).await.unwrap();
2050
2051        // Send a Version with version < MIN_PEER_VERSION
2052        let actions = manager
2053            .on_version(1, 70000, 1, "/old/".to_string(), 100, true, &stub)
2054            .await
2055            .unwrap();
2056
2057        // Should disconnect the peer
2058        assert_eq!(actions.len(), 1);
2059        assert!(matches!(&actions[0], SyncAction::DisconnectPeer(1)));
2060    }
2061
2062    #[tokio::test]
2063    async fn test_ping_responds_with_pong() {
2064        let (index, _hashes) = build_chain(1);
2065        let index = Arc::new(RwLock::new(index));
2066        let mut manager = SyncManager::new(index);
2067
2068        let stub = abtc_adapters::network::StubPeerManager::new();
2069        let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2070        let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2071        let mempool = abtc_adapters::mempool::InMemoryMempool::new();
2072
2073        // Add a peer so message dispatch works
2074        let peer_info = PeerInfo {
2075            id: 1,
2076            addr: "127.0.0.1:8333".parse().unwrap(),
2077            services: 1,
2078            version: 70016,
2079            subver: "/test/".to_string(),
2080            start_height: 0,
2081            relay_txs: true,
2082        };
2083        manager.peer_states.insert(
2084            1,
2085            PeerSyncState {
2086                info: peer_info,
2087                handshake: HandshakeState::Complete,
2088                headers_sync_pending: false,
2089                blocks_in_flight: HashSet::new(),
2090                last_header_received: None,
2091            },
2092        );
2093
2094        let actions = manager
2095            .on_message_received(
2096                1,
2097                NetworkMessage::Ping { nonce: 42 },
2098                &stub,
2099                &block_store,
2100                &chain_state_store,
2101                &mempool,
2102            )
2103            .await
2104            .unwrap();
2105
2106        assert_eq!(actions.len(), 1);
2107        assert!(matches!(
2108            &actions[0],
2109            SyncAction::SendMessage(1, NetworkMessage::Pong { nonce: 42 })
2110        ));
2111    }
2112
2113    #[tokio::test]
2114    async fn test_build_version_message() {
2115        let (index, _) = build_chain(1);
2116        let index = Arc::new(RwLock::new(index));
2117        let manager = SyncManager::new(index);
2118
2119        let addr: std::net::SocketAddr = "1.2.3.4:8333".parse().unwrap();
2120        let msg = manager.build_version_message(addr, 500);
2121
2122        match msg {
2123            NetworkMessage::Version {
2124                version,
2125                services,
2126                user_agent,
2127                start_height,
2128                relay,
2129                ..
2130            } => {
2131                assert_eq!(version, OUR_PROTOCOL_VERSION);
2132                assert_eq!(services, OUR_SERVICES);
2133                assert!(user_agent.contains("agentic-bitcoin"));
2134                assert_eq!(start_height, 500);
2135                assert!(relay);
2136            }
2137            _ => panic!("Expected Version message"),
2138        }
2139    }
2140
2141    // ── Address management tests ────────────────────────────────
2142
2143    #[tokio::test]
2144    async fn test_on_addr_stores_addresses() {
2145        let (index, _) = build_chain(1);
2146        let index = Arc::new(RwLock::new(index));
2147        let mut manager = SyncManager::new(index);
2148
2149        let now = std::time::SystemTime::now()
2150            .duration_since(std::time::UNIX_EPOCH)
2151            .unwrap_or_default()
2152            .as_secs() as u32;
2153
2154        let addrs = vec![
2155            (now, "1.2.3.4:8333".parse().unwrap()),
2156            (now, "5.6.7.8:8333".parse().unwrap()),
2157        ];
2158
2159        let actions = manager.on_addr(1, addrs).await.unwrap();
2160        assert!(actions.is_empty()); // on_addr doesn't produce actions
2161        assert_eq!(manager.known_address_count(), 2);
2162    }
2163
2164    #[tokio::test]
2165    async fn test_on_addr_rejects_old_addresses() {
2166        let (index, _) = build_chain(1);
2167        let index = Arc::new(RwLock::new(index));
2168        let mut manager = SyncManager::new(index);
2169
2170        // Timestamp from 4 hours ago (exceeds MAX_ADDR_AGE of 3 hours)
2171        let old_ts = std::time::SystemTime::now()
2172            .duration_since(std::time::UNIX_EPOCH)
2173            .unwrap_or_default()
2174            .as_secs() as u32
2175            - 4 * 60 * 60;
2176
2177        let addrs = vec![(old_ts, "1.2.3.4:8333".parse().unwrap())];
2178        manager.on_addr(1, addrs).await.unwrap();
2179        assert_eq!(manager.known_address_count(), 0);
2180    }
2181
2182    #[tokio::test]
2183    async fn test_on_getaddr_responds_with_known_addresses() {
2184        let (index, _) = build_chain(1);
2185        let index = Arc::new(RwLock::new(index));
2186        let mut manager = SyncManager::new(index);
2187
2188        let now = std::time::SystemTime::now()
2189            .duration_since(std::time::UNIX_EPOCH)
2190            .unwrap_or_default()
2191            .as_secs() as u32;
2192
2193        // Pre-populate address book
2194        manager
2195            .known_addresses
2196            .insert("1.2.3.4:8333".parse().unwrap(), (now, OUR_SERVICES));
2197        manager
2198            .known_addresses
2199            .insert("5.6.7.8:8333".parse().unwrap(), (now - 60, OUR_SERVICES));
2200
2201        let actions = manager.on_getaddr(1).await.unwrap();
2202        assert_eq!(actions.len(), 1);
2203
2204        match &actions[0] {
2205            SyncAction::SendMessage(peer_id, NetworkMessage::Addr { addresses }) => {
2206                assert_eq!(*peer_id, 1);
2207                assert_eq!(addresses.len(), 2);
2208            }
2209            other => panic!("Expected SendMessage(Addr), got {:?}", other),
2210        }
2211    }
2212
2213    #[tokio::test]
2214    async fn test_on_getaddr_empty_when_no_addresses() {
2215        let (index, _) = build_chain(1);
2216        let index = Arc::new(RwLock::new(index));
2217        let manager = SyncManager::new(index);
2218
2219        let actions = manager.on_getaddr(1).await.unwrap();
2220        assert!(actions.is_empty());
2221    }
2222
2223    #[tokio::test]
2224    async fn test_handshake_sends_getaddr_after_verack() {
2225        let (index, _) = build_chain(1);
2226        let index = Arc::new(RwLock::new(index));
2227        let mut manager = SyncManager::new(index);
2228
2229        let stub = abtc_adapters::network::StubPeerManager::new();
2230        let peer_info = PeerInfo {
2231            id: 1,
2232            addr: "127.0.0.1:8333".parse().unwrap(),
2233            services: 1,
2234            version: 70016,
2235            subver: "/test/".to_string(),
2236            start_height: 100,
2237            relay_txs: true,
2238        };
2239
2240        // Connect → Version → Verack
2241        let _ = manager.on_peer_connected(peer_info, &stub).await.unwrap();
2242        let _ = manager
2243            .on_version(1, 70016, 1, "/remote/".to_string(), 200, true, &stub)
2244            .await
2245            .unwrap();
2246        let actions = manager.on_verack(1, &stub).await.unwrap();
2247
2248        // Should include a GetAddr message
2249        let has_getaddr = actions
2250            .iter()
2251            .any(|a| matches!(a, SyncAction::SendMessage(1, NetworkMessage::GetAddr)));
2252        assert!(has_getaddr, "Handshake should send GetAddr after Verack");
2253
2254        // Peer's address should be in the address book
2255        assert_eq!(manager.known_address_count(), 1);
2256    }
2257
2258    // ── Mempool integration tests ───────────────────────────────
2259
2260    #[tokio::test]
2261    async fn test_tx_message_accepted_to_mempool() {
2262        let (index, _hashes) = build_chain(1);
2263        let index = Arc::new(RwLock::new(index));
2264        let mut manager = SyncManager::new(index);
2265
2266        let stub = abtc_adapters::network::StubPeerManager::new();
2267        let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2268        let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2269        let mempool = abtc_adapters::mempool::InMemoryMempool::new();
2270
2271        // Add a peer
2272        let peer_info = PeerInfo {
2273            id: 1,
2274            addr: "127.0.0.1:8333".parse().unwrap(),
2275            services: 1,
2276            version: 70016,
2277            subver: "/test/".to_string(),
2278            start_height: 0,
2279            relay_txs: true,
2280        };
2281        manager.peer_states.insert(
2282            1,
2283            PeerSyncState {
2284                info: peer_info,
2285                handshake: HandshakeState::Complete,
2286                headers_sync_pending: false,
2287                blocks_in_flight: HashSet::new(),
2288                last_header_received: None,
2289            },
2290        );
2291
2292        let tx = make_test_tx(5000);
2293        let actions = manager
2294            .on_message_received(
2295                1,
2296                NetworkMessage::Tx { tx: tx.clone() },
2297                &stub,
2298                &block_store,
2299                &chain_state_store,
2300                &mempool,
2301            )
2302            .await
2303            .unwrap();
2304
2305        // Should produce AcceptedTransaction (mempool accepts any valid tx)
2306        assert!(!actions.is_empty());
2307        let has_accepted = actions
2308            .iter()
2309            .any(|a| matches!(a, SyncAction::AcceptedTransaction { .. }));
2310        assert!(
2311            has_accepted,
2312            "Valid tx should produce AcceptedTransaction action"
2313        );
2314
2315        // Verify the tx is now in the mempool
2316        assert_eq!(mempool.get_transaction_count().await.unwrap(), 1);
2317    }
2318
2319    #[tokio::test]
2320    async fn test_coinbase_tx_rejected() {
2321        let (index, _hashes) = build_chain(1);
2322        let index = Arc::new(RwLock::new(index));
2323        let mut manager = SyncManager::new(index);
2324
2325        let stub = abtc_adapters::network::StubPeerManager::new();
2326        let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2327        let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2328        let mempool = abtc_adapters::mempool::InMemoryMempool::new();
2329
2330        manager.peer_states.insert(
2331            1,
2332            PeerSyncState {
2333                info: PeerInfo {
2334                    id: 1,
2335                    addr: "127.0.0.1:8333".parse().unwrap(),
2336                    services: 1,
2337                    version: 70016,
2338                    subver: "/test/".to_string(),
2339                    start_height: 0,
2340                    relay_txs: true,
2341                },
2342                handshake: HandshakeState::Complete,
2343                headers_sync_pending: false,
2344                blocks_in_flight: HashSet::new(),
2345                last_header_received: None,
2346            },
2347        );
2348
2349        // Create a coinbase transaction
2350        let coinbase = Transaction::coinbase(
2351            1,
2352            abtc_domain::script::Script::from_bytes(vec![0x01, 0x01]),
2353            vec![abtc_domain::primitives::TxOut::new(
2354                abtc_domain::primitives::Amount::from_sat(5_000_000_000),
2355                abtc_domain::script::Script::new(),
2356            )],
2357        );
2358
2359        let actions = manager
2360            .on_message_received(
2361                1,
2362                NetworkMessage::Tx { tx: coinbase },
2363                &stub,
2364                &block_store,
2365                &chain_state_store,
2366                &mempool,
2367            )
2368            .await
2369            .unwrap();
2370
2371        // Should NOT produce any transaction actions (coinbase is rejected)
2372        let has_tx_action = actions.iter().any(|a| {
2373            matches!(
2374                a,
2375                SyncAction::AcceptedTransaction { .. } | SyncAction::ProcessTransaction(_)
2376            )
2377        });
2378        assert!(!has_tx_action, "Coinbase tx should be rejected from P2P");
2379    }
2380
2381    // ── Orphan pool integration tests ──────────────────────────
2382
2383    #[tokio::test]
2384    async fn test_rejected_tx_goes_to_orphan_pool() {
2385        let (index, _hashes) = build_chain(1);
2386        let index = Arc::new(RwLock::new(index));
2387        let mut manager = SyncManager::new(index);
2388
2389        let stub = abtc_adapters::network::StubPeerManager::new();
2390        let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2391        let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2392
2393        // Use a rejecting mempool
2394        let mempool = RejectingMempool;
2395
2396        manager.peer_states.insert(
2397            1,
2398            PeerSyncState {
2399                info: PeerInfo {
2400                    id: 1,
2401                    addr: "127.0.0.1:8333".parse().unwrap(),
2402                    services: 1,
2403                    version: 70016,
2404                    subver: "/test/".to_string(),
2405                    start_height: 0,
2406                    relay_txs: true,
2407                },
2408                handshake: HandshakeState::Complete,
2409                headers_sync_pending: false,
2410                blocks_in_flight: HashSet::new(),
2411                last_header_received: None,
2412            },
2413        );
2414
2415        let tx = make_test_tx(5000);
2416        let _ = manager
2417            .on_message_received(
2418                1,
2419                NetworkMessage::Tx { tx },
2420                &stub,
2421                &block_store,
2422                &chain_state_store,
2423                &mempool,
2424            )
2425            .await
2426            .unwrap();
2427
2428        // The rejected tx should be in the orphan pool
2429        assert_eq!(manager.orphan_tx_count(), 1);
2430    }
2431
2432    #[tokio::test]
2433    async fn test_peer_disconnect_clears_orphans() {
2434        let (index, _hashes) = build_chain(1);
2435        let index = Arc::new(RwLock::new(index));
2436        let mut manager = SyncManager::new(index);
2437
2438        let stub = abtc_adapters::network::StubPeerManager::new();
2439        let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2440        let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2441        let mempool = RejectingMempool;
2442
2443        manager.peer_states.insert(
2444            1,
2445            PeerSyncState {
2446                info: PeerInfo {
2447                    id: 1,
2448                    addr: "127.0.0.1:8333".parse().unwrap(),
2449                    services: 1,
2450                    version: 70016,
2451                    subver: "/test/".to_string(),
2452                    start_height: 0,
2453                    relay_txs: true,
2454                },
2455                handshake: HandshakeState::Complete,
2456                headers_sync_pending: false,
2457                blocks_in_flight: HashSet::new(),
2458                last_header_received: None,
2459            },
2460        );
2461
2462        // Send two rejected txs that go to orphan pool
2463        let tx1 = make_test_tx(5000);
2464        let tx2 = make_test_tx(6000);
2465        let _ = manager
2466            .on_message_received(
2467                1,
2468                NetworkMessage::Tx { tx: tx1 },
2469                &stub,
2470                &block_store,
2471                &chain_state_store,
2472                &mempool,
2473            )
2474            .await
2475            .unwrap();
2476        let _ = manager
2477            .on_message_received(
2478                1,
2479                NetworkMessage::Tx { tx: tx2 },
2480                &stub,
2481                &block_store,
2482                &chain_state_store,
2483                &mempool,
2484            )
2485            .await
2486            .unwrap();
2487        assert_eq!(manager.orphan_tx_count(), 2);
2488
2489        // Disconnect peer → orphans from this peer should be removed
2490        manager.on_peer_disconnected(1);
2491        assert_eq!(manager.orphan_tx_count(), 0);
2492    }
2493
2494    /// A mempool that rejects everything (simulates missing parent inputs).
2495    struct RejectingMempool;
2496
2497    #[async_trait::async_trait]
2498    impl MempoolPort for RejectingMempool {
2499        async fn add_transaction(
2500            &self,
2501            _tx: &Transaction,
2502        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
2503            Err("missing inputs".into())
2504        }
2505        async fn remove_transaction(
2506            &self,
2507            _txid: &abtc_domain::primitives::Txid,
2508            _recursive: bool,
2509        ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
2510            Ok(())
2511        }
2512        async fn get_transaction(
2513            &self,
2514            _txid: &abtc_domain::primitives::Txid,
2515        ) -> Result<Option<abtc_ports::MempoolEntry>, Box<dyn std::error::Error + Send + Sync>>
2516        {
2517            Ok(None)
2518        }
2519        async fn get_all_transactions(
2520            &self,
2521        ) -> Result<Vec<abtc_ports::MempoolEntry>, Box<dyn std::error::Error + Send + Sync>>
2522        {
2523            Ok(Vec::new())
2524        }
2525        async fn get_transaction_count(
2526            &self,
2527        ) -> Result<u32, Box<dyn std::error::Error + Send + Sync>> {
2528            Ok(0)
2529        }
2530        async fn estimate_fee(
2531            &self,
2532            _target_blocks: u32,
2533        ) -> Result<f64, Box<dyn std::error::Error + Send + Sync>> {
2534            Ok(1.0)
2535        }
2536        async fn get_mempool_info(
2537            &self,
2538        ) -> Result<abtc_ports::MempoolInfo, Box<dyn std::error::Error + Send + Sync>> {
2539            Ok(abtc_ports::MempoolInfo {
2540                size: 0,
2541                bytes: 0,
2542                usage: 0,
2543                max_mempool: 300_000_000,
2544                min_relay_fee: 0.00001,
2545            })
2546        }
2547        async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
2548            Ok(())
2549        }
2550    }
2551}