1use crate::block_index::{BlockIndex, BlockValidationStatus};
17use crate::orphan_pool::OrphanPool;
18use crate::peer_scoring::{Misbehavior, PeerScoring, ScoreAction};
19use abtc_domain::primitives::{Block, BlockHash, BlockHeader, Transaction};
20use abtc_ports::{
21 BlockStore, ChainStateStore, InventoryItem, MempoolPort, NetworkMessage, PeerEvent, PeerInfo,
22 PeerManager,
23};
24use std::collections::{HashMap, HashSet, VecDeque};
25use std::net::SocketAddr;
26use std::sync::Arc;
27use tokio::sync::RwLock;
28
29const MAX_HEADERS_PER_REQUEST: usize = 2000;
31
32const MAX_BLOCKS_IN_FLIGHT: usize = 16;
34
35const MAX_INV_SIZE: usize = 500;
37
38const MIN_PEER_VERSION: u32 = 70015;
40
41const OUR_PROTOCOL_VERSION: u32 = 70016;
43
44const OUR_USER_AGENT: &str = "/agentic-bitcoin:0.1.0/";
46
47const OUR_SERVICES: u64 = 1 | (1 << 3);
49
50const MAX_ADDR_TO_SEND: usize = 1000;
52
53const MAX_KNOWN_ADDRESSES: usize = 10_000;
55
56const MAX_ADDR_AGE: u32 = 3 * 60 * 60;
58
59#[derive(Debug, Clone, Copy, PartialEq, Eq)]
61pub enum HandshakeState {
62 AwaitingVersion,
64 AwaitingVerack,
66 Complete,
68}
69
70#[derive(Debug, Clone)]
72struct PeerSyncState {
73 info: PeerInfo,
75 handshake: HandshakeState,
77 headers_sync_pending: bool,
79 blocks_in_flight: HashSet<BlockHash>,
81 last_header_received: Option<BlockHash>,
83}
84
85#[derive(Debug, Clone, Copy, PartialEq, Eq)]
87pub enum SyncState {
88 Idle,
90 HeaderSync,
92 BlockSync,
94 Synced,
96}
97
98pub struct SyncManager {
103 block_index: Arc<RwLock<BlockIndex>>,
105 peer_states: HashMap<u64, PeerSyncState>,
107 state: SyncState,
109 blocks_to_download: VecDeque<BlockHash>,
111 blocks_in_flight: HashSet<BlockHash>,
113 orphan_blocks: HashMap<BlockHash, Block>,
115 next_block_height: u32,
117 recently_seen_txids: HashSet<abtc_domain::primitives::Txid>,
119 known_addresses: HashMap<SocketAddr, (u32, u64)>,
121 peer_scoring: PeerScoring,
123 orphan_tx_pool: OrphanPool,
125}
126
127impl SyncManager {
128 pub fn new(block_index: Arc<RwLock<BlockIndex>>) -> Self {
130 SyncManager {
131 block_index,
132 peer_states: HashMap::new(),
133 state: SyncState::Idle,
134 blocks_to_download: VecDeque::new(),
135 blocks_in_flight: HashSet::new(),
136 orphan_blocks: HashMap::new(),
137 next_block_height: 1, recently_seen_txids: HashSet::new(),
139 known_addresses: HashMap::new(),
140 peer_scoring: PeerScoring::new(),
141 orphan_tx_pool: OrphanPool::new(),
142 }
143 }
144
145 pub fn state(&self) -> SyncState {
147 self.state
148 }
149
150 pub fn blocks_remaining(&self) -> usize {
152 self.blocks_to_download.len() + self.blocks_in_flight.len()
153 }
154
155 pub async fn on_peer_event(
158 &mut self,
159 event: PeerEvent,
160 peer_manager: &dyn PeerManager,
161 block_store: &dyn BlockStore,
162 chain_state: &dyn ChainStateStore,
163 mempool: &dyn MempoolPort,
164 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
165 let mut actions = Vec::new();
166
167 match event {
168 PeerEvent::Connected { peer_info } => {
169 actions.extend(self.on_peer_connected(peer_info, peer_manager).await?);
170 }
171 PeerEvent::Disconnected { peer_id } => {
172 self.on_peer_disconnected(peer_id);
173 }
174 PeerEvent::MessageReceived { peer_id, message } => {
175 actions.extend(
176 self.on_message_received(
177 peer_id,
178 message,
179 peer_manager,
180 block_store,
181 chain_state,
182 mempool,
183 )
184 .await?,
185 );
186 }
187 PeerEvent::Misbehaving {
188 peer_id,
189 reason,
190 score,
191 } => {
192 tracing::warn!(
193 "Peer {} misbehaving (score +{}): {}",
194 peer_id,
195 score,
196 reason
197 );
198 let now = std::time::SystemTime::now()
200 .duration_since(std::time::UNIX_EPOCH)
201 .map(|d| d.as_secs())
202 .unwrap_or(0);
203 let action =
204 self.peer_scoring
205 .record_misbehavior(peer_id, Misbehavior::Custom(score), now);
206 if let ScoreAction::Ban {
207 peer_id,
208 addr,
209 reason,
210 } = action
211 {
212 tracing::warn!("Banning peer {} ({}): {}", peer_id, addr, reason);
213 actions.push(SyncAction::DisconnectPeer(peer_id));
214 }
215 }
216 }
217
218 Ok(actions)
219 }
220
221 async fn on_peer_connected(
223 &mut self,
224 peer_info: PeerInfo,
225 _peer_manager: &dyn PeerManager,
226 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
227 let peer_id = peer_info.id;
228
229 self.peer_scoring.register_peer(peer_id, peer_info.addr);
231
232 tracing::info!(
233 "New peer connected: {} — initiating handshake",
234 peer_info.addr,
235 );
236
237 let best_height = {
238 let index = self.block_index.read().await;
239 index.best_height()
240 };
241
242 let sync_state = PeerSyncState {
243 info: peer_info.clone(),
244 handshake: HandshakeState::AwaitingVersion,
245 headers_sync_pending: false,
246 blocks_in_flight: HashSet::new(),
247 last_header_received: None,
248 };
249
250 self.peer_states.insert(peer_id, sync_state);
251
252 let version_msg = self.build_version_message(peer_info.addr, best_height);
254 Ok(vec![SyncAction::SendMessage(peer_id, version_msg)])
255 }
256
257 fn build_version_message(
259 &self,
260 addr_recv: std::net::SocketAddr,
261 start_height: u32,
262 ) -> NetworkMessage {
263 use std::time::{SystemTime, UNIX_EPOCH};
264 let timestamp = SystemTime::now()
265 .duration_since(UNIX_EPOCH)
266 .unwrap_or_default()
267 .as_secs() as i64;
268
269 NetworkMessage::Version {
270 version: OUR_PROTOCOL_VERSION,
271 services: OUR_SERVICES,
272 timestamp,
273 addr_recv,
274 addr_from: "0.0.0.0:0".parse().unwrap(),
275 nonce: rand::random::<u64>(),
276 user_agent: OUR_USER_AGENT.to_string(),
277 start_height,
278 relay: true,
279 }
280 }
281
282 #[allow(clippy::too_many_arguments)]
284 async fn on_version(
285 &mut self,
286 peer_id: u64,
287 version: u32,
288 services: u64,
289 user_agent: String,
290 start_height: u32,
291 relay: bool,
292 _peer_manager: &dyn PeerManager,
293 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
294 let mut actions = Vec::new();
295
296 if version < MIN_PEER_VERSION {
298 tracing::warn!(
299 "Peer {} has old protocol version {}, disconnecting",
300 peer_id,
301 version
302 );
303 return Ok(vec![SyncAction::DisconnectPeer(peer_id)]);
304 }
305
306 if let Some(state) = self.peer_states.get_mut(&peer_id) {
308 state.info.version = version;
309 state.info.services = services;
310 state.info.subver = user_agent.clone();
311 state.info.start_height = start_height;
312 state.info.relay_txs = relay;
313
314 match state.handshake {
315 HandshakeState::AwaitingVersion => {
316 state.handshake = HandshakeState::AwaitingVerack;
318 actions.push(SyncAction::SendMessage(peer_id, NetworkMessage::Verack));
319 tracing::debug!(
320 "Received Version from peer {} (v{}, {}, height={}), sent Verack",
321 peer_id,
322 version,
323 user_agent,
324 start_height
325 );
326 }
327 _ => {
328 tracing::warn!(
329 "Unexpected Version from peer {} (state: {:?})",
330 peer_id,
331 state.handshake
332 );
333 }
334 }
335 }
336
337 Ok(actions)
338 }
339
340 async fn on_verack(
342 &mut self,
343 peer_id: u64,
344 peer_manager: &dyn PeerManager,
345 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
346 let mut actions = Vec::new();
347
348 let should_start_sync = if let Some(state) = self.peer_states.get_mut(&peer_id) {
349 match state.handshake {
350 HandshakeState::AwaitingVerack => {
351 state.handshake = HandshakeState::Complete;
352 tracing::info!(
353 "Handshake complete with peer {} (v{}, height={})",
354 peer_id,
355 state.info.version,
356 state.info.start_height
357 );
358 true
359 }
360 _ => {
361 tracing::warn!(
362 "Unexpected Verack from peer {} (state: {:?})",
363 peer_id,
364 state.handshake
365 );
366 false
367 }
368 }
369 } else {
370 false
371 };
372
373 if should_start_sync {
375 if let Some(state) = self.peer_states.get(&peer_id) {
377 let now = std::time::SystemTime::now()
378 .duration_since(std::time::UNIX_EPOCH)
379 .unwrap_or_default()
380 .as_secs() as u32;
381 self.known_addresses
382 .insert(state.info.addr, (now, state.info.services));
383 }
384
385 actions.push(SyncAction::SendMessage(peer_id, NetworkMessage::GetAddr));
387
388 if self.state == SyncState::Idle || self.state == SyncState::HeaderSync {
389 self.state = SyncState::HeaderSync;
390 self.request_headers_from_peer(peer_id, peer_manager)
391 .await?;
392 }
393 }
394
395 Ok(actions)
396 }
397
398 fn on_peer_disconnected(&mut self, peer_id: u64) {
400 self.peer_scoring.remove_peer(peer_id);
401
402 if let Some(state) = self.peer_states.remove(&peer_id) {
403 for hash in &state.blocks_in_flight {
405 self.blocks_in_flight.remove(hash);
406 self.blocks_to_download.push_front(*hash);
407 }
408
409 let orphans_removed = self.orphan_tx_pool.remove_for_peer(peer_id);
411 tracing::info!(
412 "Peer {} disconnected, {} blocks reassigned, {} orphan txs removed",
413 peer_id,
414 state.blocks_in_flight.len(),
415 orphans_removed,
416 );
417 }
418
419 if self.peer_states.is_empty() {
420 self.state = SyncState::Idle;
421 }
422 }
423
424 async fn on_message_received(
426 &mut self,
427 peer_id: u64,
428 message: NetworkMessage,
429 peer_manager: &dyn PeerManager,
430 block_store: &dyn BlockStore,
431 _chain_state: &dyn ChainStateStore,
432 mempool: &dyn MempoolPort,
433 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
434 let mut actions = Vec::new();
435
436 match message {
437 NetworkMessage::Version {
439 version,
440 services,
441 user_agent,
442 start_height,
443 relay,
444 ..
445 } => {
446 actions.extend(
447 self.on_version(
448 peer_id,
449 version,
450 services,
451 user_agent,
452 start_height,
453 relay,
454 peer_manager,
455 )
456 .await?,
457 );
458 }
459 NetworkMessage::Verack => {
460 actions.extend(self.on_verack(peer_id, peer_manager).await?);
461 }
462
463 NetworkMessage::Headers { headers } => {
465 actions.extend(self.on_headers(peer_id, headers, peer_manager).await?);
466 }
467 NetworkMessage::Block { block } => {
468 actions.extend(
469 self.on_block(peer_id, block, block_store, peer_manager)
470 .await?,
471 );
472 }
473 NetworkMessage::Inv { items } => {
474 actions.extend(self.on_inv(peer_id, items, peer_manager).await?);
475 }
476 NetworkMessage::GetHeaders {
477 block_locator,
478 hash_stop,
479 ..
480 } => {
481 actions.extend(
482 self.on_getheaders(peer_id, block_locator, hash_stop)
483 .await?,
484 );
485 }
486 NetworkMessage::GetBlocks {
487 block_locator,
488 hash_stop,
489 ..
490 } => {
491 actions.extend(self.on_getblocks(peer_id, block_locator, hash_stop).await?);
492 }
493 NetworkMessage::GetData { items } => {
494 actions.extend(self.on_getdata(peer_id, items, block_store).await?);
495 }
496 NetworkMessage::Ping { nonce } => {
497 actions.push(SyncAction::SendMessage(
498 peer_id,
499 NetworkMessage::Pong { nonce },
500 ));
501 }
502 NetworkMessage::Tx { tx } => {
503 let txid = tx.txid();
504
505 if let Err(e) = abtc_domain::consensus::rules::check_transaction(&tx) {
507 tracing::debug!(
508 "Rejected tx {} from peer {}: consensus violation: {}",
509 txid,
510 peer_id,
511 e,
512 );
513 return Ok(actions);
515 }
516
517 if tx.is_coinbase() {
519 tracing::debug!("Rejected coinbase tx {} from peer {}", txid, peer_id);
520 return Ok(actions);
521 }
522
523 let now = std::time::SystemTime::now()
525 .duration_since(std::time::UNIX_EPOCH)
526 .map(|d| d.as_secs())
527 .unwrap_or(0);
528 self.orphan_tx_pool.expire_old_orphans(now);
529
530 match mempool.add_transaction(&tx).await {
532 Ok(()) => {
533 tracing::info!("Accepted tx {} from peer {} into mempool", txid, peer_id,);
534 actions.push(SyncAction::AcceptedTransaction {
535 tx: tx.clone(),
536 from_peer: peer_id,
537 });
538
539 let children = self
542 .orphan_tx_pool
543 .get_children_of(&txid, tx.outputs.len() as u32);
544 for child_txid in children {
545 if let Some(entry) = self.orphan_tx_pool.remove_orphan(&child_txid) {
546 tracing::debug!(
547 "Resolving orphan tx {} (parent {} now available)",
548 child_txid,
549 txid,
550 );
551 match mempool.add_transaction(&entry.tx).await {
553 Ok(()) => {
554 tracing::info!(
555 "Orphan tx {} accepted into mempool",
556 child_txid,
557 );
558 actions.push(SyncAction::AcceptedTransaction {
559 tx: entry.tx,
560 from_peer: entry.from_peer,
561 });
562 }
563 Err(e) => {
564 tracing::debug!(
565 "Orphan tx {} still rejected: {}",
566 child_txid,
567 e,
568 );
569 }
570 }
571 }
572 }
573 }
574 Err(e) => {
575 tracing::debug!(
576 "Mempool rejected tx {} from peer {}: {}",
577 txid,
578 peer_id,
579 e,
580 );
581 let add_result = self.orphan_tx_pool.add_orphan(tx.clone(), peer_id, now);
584 match add_result {
585 crate::orphan_pool::AddOrphanResult::Added => {
586 tracing::debug!(
587 "Added tx {} to orphan pool (from peer {})",
588 txid,
589 peer_id,
590 );
591 }
592 crate::orphan_pool::AddOrphanResult::AddedAfterEviction { evicted } => {
593 tracing::debug!(
594 "Added tx {} to orphan pool after evicting {} (from peer {})",
595 txid,
596 evicted,
597 peer_id,
598 );
599 }
600 _ => {
601 actions.push(SyncAction::ProcessTransaction(tx));
604 }
605 }
606 }
607 }
608 }
609 NetworkMessage::Addr { addresses } => {
610 actions.extend(self.on_addr(peer_id, addresses).await?);
611 }
612 NetworkMessage::GetAddr => {
613 actions.extend(self.on_getaddr(peer_id).await?);
614 }
615 NetworkMessage::PackageTx { transactions } => {
616 let tx_count = transactions.len();
617 tracing::info!("Received package of {} txs from peer {}", tx_count, peer_id,);
618
619 match abtc_domain::policy::packages::validate_package(&transactions) {
621 Ok(pkg_type) => {
622 tracing::debug!(
623 "Package from peer {} validated: {:?}, {} txs",
624 peer_id,
625 pkg_type,
626 tx_count,
627 );
628
629 for tx in &transactions {
633 let txid = tx.txid();
634 match mempool.add_transaction(tx).await {
635 Ok(()) => {
636 actions.push(SyncAction::AcceptedTransaction {
637 tx: tx.clone(),
638 from_peer: peer_id,
639 });
640 }
641 Err(e) => {
642 tracing::debug!(
643 "Package tx {} from peer {} rejected: {}",
644 txid,
645 peer_id,
646 e,
647 );
648 }
649 }
650 }
651 }
652 Err(e) => {
653 tracing::debug!("Invalid package from peer {}: {}", peer_id, e,);
654 }
655 }
656 }
657 NetworkMessage::SendPackages { version } => {
658 tracing::info!("Peer {} supports package relay v{}", peer_id, version,);
659 }
662 _ => {
663 }
665 }
666
667 Ok(actions)
668 }
669
670 async fn on_headers(
672 &mut self,
673 peer_id: u64,
674 headers: Vec<BlockHeader>,
675 peer_manager: &dyn PeerManager,
676 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
677 if let Some(state) = self.peer_states.get_mut(&peer_id) {
678 state.headers_sync_pending = false;
679 }
680
681 if headers.is_empty() {
682 tracing::info!("Peer {} has no more headers", peer_id);
684 self.transition_to_block_sync(peer_manager).await?;
685 return Ok(Vec::new());
686 }
687
688 let count = headers.len();
689 tracing::info!("Received {} headers from peer {}", count, peer_id);
690
691 let mut last_hash = None;
692 let mut new_headers = 0;
693 {
694 let mut index = self.block_index.write().await;
695 for header in &headers {
696 match index.add_header(header.clone()) {
697 Ok((hash, _reorged)) => {
698 last_hash = Some(hash);
699 new_headers += 1;
700 }
701 Err(e) => {
702 tracing::warn!("Failed to add header from peer {}: {}", peer_id, e);
703 }
705 }
706 }
707 }
708
709 if let Some(hash) = last_hash {
710 if let Some(state) = self.peer_states.get_mut(&peer_id) {
711 state.last_header_received = Some(hash);
712 }
713 }
714
715 tracing::info!(
716 "Added {} new headers (block index height: {})",
717 new_headers,
718 self.block_index.read().await.best_height()
719 );
720
721 if count >= MAX_HEADERS_PER_REQUEST {
723 self.request_headers_from_peer(peer_id, peer_manager)
724 .await?;
725 } else {
726 self.transition_to_block_sync(peer_manager).await?;
728 }
729
730 Ok(Vec::new())
731 }
732
733 async fn on_block(
735 &mut self,
736 peer_id: u64,
737 block: Block,
738 _block_store: &dyn BlockStore,
739 peer_manager: &dyn PeerManager,
740 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
741 let hash = block.block_hash();
742
743 self.blocks_in_flight.remove(&hash);
745 if let Some(state) = self.peer_states.get_mut(&peer_id) {
746 state.blocks_in_flight.remove(&hash);
747 }
748
749 tracing::debug!("Received block {} from peer {}", hash, peer_id);
750
751 let expected_hash = {
753 let index = self.block_index.read().await;
754 index.get_hash_at_height(self.next_block_height)
755 };
756
757 let actions = if expected_hash == Some(hash) {
758 self.next_block_height += 1;
760
761 {
763 let mut index = self.block_index.write().await;
764 index.set_status(&hash, BlockValidationStatus::FullyValidated);
765 }
766
767 let mut result = vec![SyncAction::ProcessBlock(block)];
768
769 while let Some(next_hash) = {
771 let index = self.block_index.read().await;
772 index.get_hash_at_height(self.next_block_height)
773 } {
774 if let Some(orphan) = self.orphan_blocks.remove(&next_hash) {
775 self.next_block_height += 1;
776 {
777 let mut index = self.block_index.write().await;
778 index.set_status(&next_hash, BlockValidationStatus::FullyValidated);
779 }
780 result.push(SyncAction::ProcessBlock(orphan));
781 } else {
782 break;
783 }
784 }
785
786 result
787 } else {
788 self.orphan_blocks.insert(hash, block);
790 Vec::new()
791 };
792
793 self.request_blocks(peer_manager).await?;
795
796 if self.blocks_to_download.is_empty()
798 && self.blocks_in_flight.is_empty()
799 && self.orphan_blocks.is_empty()
800 && self.state == SyncState::BlockSync
801 {
802 self.state = SyncState::Synced;
803 tracing::info!(
804 "Chain sync complete at height {}",
805 self.next_block_height - 1
806 );
807 }
808
809 Ok(actions)
810 }
811
812 async fn on_inv(
814 &mut self,
815 peer_id: u64,
816 items: Vec<InventoryItem>,
817 _peer_manager: &dyn PeerManager,
818 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
819 let mut blocks_to_request = Vec::new();
820 let mut txs_to_request = Vec::new();
821
822 for item in items {
823 match item {
824 InventoryItem::Block(hash) => {
825 let known = self.block_index.read().await.contains(&hash);
826 if !known && !self.blocks_in_flight.contains(&hash) {
827 blocks_to_request.push(InventoryItem::Block(hash));
828 }
829 }
830 InventoryItem::Tx(txid) => {
831 if !self.recently_seen_txids.contains(&txid) {
832 txs_to_request.push(InventoryItem::Tx(txid));
833 self.recently_seen_txids.insert(txid);
834
835 if self.recently_seen_txids.len() > 50_000 {
837 self.recently_seen_txids.clear();
838 }
839 }
840 }
841 }
842 }
843
844 let mut actions = Vec::new();
845
846 if !blocks_to_request.is_empty() {
848 actions.push(SyncAction::SendMessage(
849 peer_id,
850 NetworkMessage::GetData {
851 items: blocks_to_request,
852 },
853 ));
854 }
855
856 if !txs_to_request.is_empty() {
858 actions.push(SyncAction::SendMessage(
859 peer_id,
860 NetworkMessage::GetData {
861 items: txs_to_request,
862 },
863 ));
864 }
865
866 Ok(actions)
867 }
868
869 async fn on_getheaders(
871 &mut self,
872 peer_id: u64,
873 block_locator: Vec<BlockHash>,
874 hash_stop: BlockHash,
875 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
876 let index = self.block_index.read().await;
877
878 let mut start_height = 0u32;
880 for locator_hash in &block_locator {
881 if let Some(entry) = index.get(locator_hash) {
882 start_height = entry.height + 1;
883 break;
884 }
885 }
886
887 let mut headers = Vec::new();
889 let mut height = start_height;
890 while headers.len() < MAX_HEADERS_PER_REQUEST {
891 if let Some(hash) = index.get_hash_at_height(height) {
892 if let Some(entry) = index.get(&hash) {
893 headers.push(entry.header.clone());
894 if hash == hash_stop {
895 break;
896 }
897 }
898 } else {
899 break;
900 }
901 height += 1;
902 }
903
904 if !headers.is_empty() {
905 return Ok(vec![SyncAction::SendMessage(
906 peer_id,
907 NetworkMessage::Headers { headers },
908 )]);
909 }
910
911 Ok(Vec::new())
912 }
913
914 async fn on_getblocks(
920 &mut self,
921 peer_id: u64,
922 block_locator: Vec<BlockHash>,
923 hash_stop: BlockHash,
924 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
925 let index = self.block_index.read().await;
926
927 let mut start_height = 0u32;
929 for locator_hash in &block_locator {
930 if let Some(entry) = index.get(locator_hash) {
931 start_height = entry.height + 1;
932 break;
933 }
934 }
935
936 let mut inv_items = Vec::new();
938 let mut height = start_height;
939 while inv_items.len() < MAX_INV_SIZE {
940 if let Some(hash) = index.get_hash_at_height(height) {
941 inv_items.push(InventoryItem::Block(hash));
942 if hash == hash_stop {
943 break;
944 }
945 } else {
946 break;
947 }
948 height += 1;
949 }
950
951 if !inv_items.is_empty() {
952 tracing::debug!(
953 "Responding to getblocks from peer {} with {} inv items (heights {}..{})",
954 peer_id,
955 inv_items.len(),
956 start_height,
957 height.saturating_sub(1),
958 );
959 return Ok(vec![SyncAction::SendMessage(
960 peer_id,
961 NetworkMessage::Inv { items: inv_items },
962 )]);
963 }
964
965 Ok(Vec::new())
966 }
967
968 async fn on_getdata(
974 &mut self,
975 peer_id: u64,
976 items: Vec<InventoryItem>,
977 block_store: &dyn BlockStore,
978 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
979 let mut actions = Vec::new();
980
981 for item in items {
982 match item {
983 InventoryItem::Block(hash) => match block_store.get_block(&hash).await {
984 Ok(Some(block)) => {
985 tracing::debug!("Serving block {} to peer {}", hash, peer_id,);
986 actions.push(SyncAction::SendMessage(
987 peer_id,
988 NetworkMessage::Block { block },
989 ));
990 }
991 Ok(None) => {
992 tracing::debug!("Peer {} requested unknown block {}", peer_id, hash,);
993 }
994 Err(e) => {
995 tracing::warn!("Error fetching block {} for peer {}: {}", hash, peer_id, e,);
996 }
997 },
998 InventoryItem::Tx(_txid) => {
999 }
1002 }
1003 }
1004
1005 Ok(actions)
1006 }
1007
1008 pub async fn on_getdata_with_mempool(
1012 &mut self,
1013 peer_id: u64,
1014 items: Vec<InventoryItem>,
1015 block_store: &dyn BlockStore,
1016 mempool: &dyn MempoolPort,
1017 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
1018 let mut actions = Vec::new();
1019
1020 for item in items {
1021 match item {
1022 InventoryItem::Block(hash) => {
1023 if let Ok(Some(block)) = block_store.get_block(&hash).await {
1024 actions.push(SyncAction::SendMessage(
1025 peer_id,
1026 NetworkMessage::Block { block },
1027 ));
1028 }
1029 }
1030 InventoryItem::Tx(txid) => {
1031 if let Ok(Some(entry)) = mempool.get_transaction(&txid).await {
1032 tracing::debug!("Serving transaction {} to peer {}", txid, peer_id,);
1033 actions.push(SyncAction::SendMessage(
1034 peer_id,
1035 NetworkMessage::Tx { tx: entry.tx },
1036 ));
1037 }
1038 }
1039 }
1040 }
1041
1042 Ok(actions)
1043 }
1044
1045 pub async fn announce_block(
1050 &self,
1051 block_hash: BlockHash,
1052 from_peer: Option<u64>,
1053 peer_manager: &dyn PeerManager,
1054 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1055 let inv = NetworkMessage::Inv {
1056 items: vec![InventoryItem::Block(block_hash)],
1057 };
1058
1059 for &peer_id in self.peer_states.keys() {
1060 if Some(peer_id) == from_peer {
1062 continue;
1063 }
1064 let _ = peer_manager.send_to_peer(peer_id, inv.clone()).await;
1065 }
1066
1067 Ok(())
1068 }
1069
1070 pub async fn announce_transaction(
1076 &self,
1077 txid: abtc_domain::primitives::Txid,
1078 from_peer: Option<u64>,
1079 peer_manager: &dyn PeerManager,
1080 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1081 let inv = NetworkMessage::Inv {
1082 items: vec![InventoryItem::Tx(txid)],
1083 };
1084
1085 for &peer_id in self.peer_states.keys() {
1086 if Some(peer_id) == from_peer {
1087 continue;
1088 }
1089 let _ = peer_manager.send_to_peer(peer_id, inv.clone()).await;
1090 }
1091
1092 Ok(())
1093 }
1094
1095 async fn on_addr(
1099 &mut self,
1100 peer_id: u64,
1101 addresses: Vec<(u32, SocketAddr)>,
1102 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
1103 let now = std::time::SystemTime::now()
1104 .duration_since(std::time::UNIX_EPOCH)
1105 .unwrap_or_default()
1106 .as_secs() as u32;
1107
1108 let mut added = 0usize;
1109 for (timestamp, addr) in &addresses {
1110 if now.saturating_sub(*timestamp) > MAX_ADDR_AGE {
1112 continue;
1113 }
1114 if let Some((existing_ts, _)) = self.known_addresses.get(addr) {
1116 if *existing_ts >= *timestamp {
1117 continue;
1118 }
1119 }
1120 self.known_addresses
1121 .insert(*addr, (*timestamp, OUR_SERVICES));
1122 added += 1;
1123 }
1124
1125 while self.known_addresses.len() > MAX_KNOWN_ADDRESSES {
1127 if let Some(oldest_addr) = self
1129 .known_addresses
1130 .iter()
1131 .min_by_key(|(_, (ts, _))| *ts)
1132 .map(|(addr, _)| *addr)
1133 {
1134 self.known_addresses.remove(&oldest_addr);
1135 } else {
1136 break;
1137 }
1138 }
1139
1140 if added > 0 {
1141 tracing::debug!(
1142 "Learned {} new addresses from peer {} (total known: {})",
1143 added,
1144 peer_id,
1145 self.known_addresses.len(),
1146 );
1147 }
1148
1149 Ok(Vec::new())
1150 }
1151
1152 async fn on_getaddr(
1154 &self,
1155 peer_id: u64,
1156 ) -> Result<Vec<SyncAction>, Box<dyn std::error::Error + Send + Sync>> {
1157 let now = std::time::SystemTime::now()
1158 .duration_since(std::time::UNIX_EPOCH)
1159 .unwrap_or_default()
1160 .as_secs() as u32;
1161
1162 let mut addrs: Vec<(u32, SocketAddr)> = self
1164 .known_addresses
1165 .iter()
1166 .filter(|(_, (ts, _))| now.saturating_sub(*ts) <= MAX_ADDR_AGE)
1167 .map(|(addr, (ts, _))| (*ts, *addr))
1168 .collect();
1169
1170 addrs.sort_by(|a, b| b.0.cmp(&a.0));
1172 addrs.truncate(MAX_ADDR_TO_SEND);
1173
1174 if addrs.is_empty() {
1175 return Ok(Vec::new());
1176 }
1177
1178 tracing::debug!(
1179 "Sending {} addresses to peer {} in response to getaddr",
1180 addrs.len(),
1181 peer_id,
1182 );
1183
1184 Ok(vec![SyncAction::SendMessage(
1185 peer_id,
1186 NetworkMessage::Addr { addresses: addrs },
1187 )])
1188 }
1189
1190 pub fn known_address_count(&self) -> usize {
1192 self.known_addresses.len()
1193 }
1194
1195 pub fn is_banned(&self, addr: &SocketAddr) -> bool {
1197 let now = std::time::SystemTime::now()
1198 .duration_since(std::time::UNIX_EPOCH)
1199 .map(|d| d.as_secs())
1200 .unwrap_or(0);
1201 self.peer_scoring.is_banned(addr, now)
1202 }
1203
1204 pub fn peer_ban_score(&self, peer_id: u64) -> i32 {
1206 self.peer_scoring.get_score(peer_id)
1207 }
1208
1209 pub fn orphan_tx_count(&self) -> usize {
1211 self.orphan_tx_pool.len()
1212 }
1213
1214 async fn request_headers_from_peer(
1216 &mut self,
1217 peer_id: u64,
1218 peer_manager: &dyn PeerManager,
1219 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1220 if let Some(state) = self.peer_states.get_mut(&peer_id) {
1221 if state.headers_sync_pending {
1222 return Ok(()); }
1224 state.headers_sync_pending = true;
1225 }
1226
1227 let locator = self.block_index.read().await.build_locator();
1228
1229 let msg = NetworkMessage::GetHeaders {
1230 version: 70016,
1231 block_locator: locator,
1232 hash_stop: BlockHash::zero(), };
1234
1235 peer_manager.send_to_peer(peer_id, msg).await?;
1236 tracing::debug!("Sent getheaders to peer {}", peer_id);
1237 Ok(())
1238 }
1239
1240 async fn transition_to_block_sync(
1242 &mut self,
1243 peer_manager: &dyn PeerManager,
1244 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1245 if self.state != SyncState::HeaderSync {
1246 return Ok(());
1247 }
1248
1249 let index = self.block_index.read().await;
1250 let best_height = index.best_height();
1251
1252 self.blocks_to_download.clear();
1254 for height in self.next_block_height..=best_height {
1255 if let Some(hash) = index.get_hash_at_height(height) {
1256 self.blocks_to_download.push_back(hash);
1257 }
1258 }
1259
1260 drop(index);
1261
1262 let count = self.blocks_to_download.len();
1263 if count == 0 {
1264 self.state = SyncState::Synced;
1265 tracing::info!("Already synced — no blocks to download");
1266 } else {
1267 self.state = SyncState::BlockSync;
1268 tracing::info!("Transitioning to block sync: {} blocks to download", count);
1269 self.request_blocks(peer_manager).await?;
1270 }
1271
1272 Ok(())
1273 }
1274
1275 async fn request_blocks(
1277 &mut self,
1278 peer_manager: &dyn PeerManager,
1279 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1280 let peer_ids: Vec<u64> = self.peer_states.keys().copied().collect();
1282 if peer_ids.is_empty() {
1283 return Ok(());
1284 }
1285
1286 let mut peer_idx = 0;
1287
1288 while self.blocks_in_flight.len() < MAX_BLOCKS_IN_FLIGHT {
1289 let hash = match self.blocks_to_download.pop_front() {
1290 Some(h) => h,
1291 None => break,
1292 };
1293
1294 let peer_id = peer_ids[peer_idx % peer_ids.len()];
1295 peer_idx += 1;
1296
1297 self.blocks_in_flight.insert(hash);
1298 if let Some(state) = self.peer_states.get_mut(&peer_id) {
1299 state.blocks_in_flight.insert(hash);
1300 }
1301
1302 let msg = NetworkMessage::GetData {
1303 items: vec![InventoryItem::Block(hash)],
1304 };
1305 let _ = peer_manager.send_to_peer(peer_id, msg).await;
1306 }
1307
1308 Ok(())
1309 }
1310}
1311
1312#[derive(Debug)]
1314pub enum SyncAction {
1315 ProcessBlock(Block),
1317 ProcessTransaction(Transaction),
1319 AcceptedTransaction {
1321 tx: Transaction,
1323 from_peer: u64,
1325 },
1326 SendMessage(u64, NetworkMessage),
1328 DisconnectPeer(u64),
1330}
1331
1332#[cfg(test)]
1333mod tests {
1334 use super::*;
1335 use abtc_domain::primitives::Hash256;
1336
1337 fn make_header(prev: BlockHash, nonce: u32) -> BlockHeader {
1338 BlockHeader {
1339 version: 1,
1340 prev_block_hash: prev,
1341 merkle_root: Hash256::from_bytes([nonce as u8; 32]),
1342 time: 1231006505 + nonce,
1343 bits: 0x1d00ffff,
1344 nonce,
1345 }
1346 }
1347
1348 #[tokio::test]
1349 async fn test_sync_manager_creation() {
1350 let mut index = BlockIndex::new();
1351 let genesis = make_header(BlockHash::zero(), 0);
1352 index.init_genesis(genesis);
1353
1354 let index = Arc::new(RwLock::new(index));
1355 let manager = SyncManager::new(index);
1356
1357 assert_eq!(manager.state(), SyncState::Idle);
1358 assert_eq!(manager.blocks_remaining(), 0);
1359 }
1360
1361 #[tokio::test]
1362 async fn test_sync_state_transitions() {
1363 let mut index = BlockIndex::new();
1364 let genesis = make_header(BlockHash::zero(), 0);
1365 index.init_genesis(genesis);
1366
1367 let index = Arc::new(RwLock::new(index));
1368 let mut manager = SyncManager::new(index);
1369
1370 assert_eq!(manager.state(), SyncState::Idle);
1372
1373 let peer_info = PeerInfo {
1375 id: 1,
1376 addr: "127.0.0.1:8333".parse().unwrap(),
1377 services: 1,
1378 version: 70016,
1379 subver: "/test/".to_string(),
1380 start_height: 100,
1381 relay_txs: true,
1382 };
1383
1384 manager.peer_states.insert(
1386 1,
1387 PeerSyncState {
1388 info: peer_info,
1389 handshake: HandshakeState::Complete,
1390 headers_sync_pending: false,
1391 blocks_in_flight: HashSet::new(),
1392 last_header_received: None,
1393 },
1394 );
1395 manager.state = SyncState::HeaderSync;
1396
1397 assert_eq!(manager.state(), SyncState::HeaderSync);
1398 }
1399
1400 #[tokio::test]
1401 async fn test_peer_disconnect_reassigns_blocks() {
1402 let mut index = BlockIndex::new();
1403 let genesis = make_header(BlockHash::zero(), 0);
1404 let genesis_hash = genesis.block_hash();
1405 index.init_genesis(genesis);
1406
1407 let h1 = make_header(genesis_hash, 1);
1408 let (h1_hash, _) = index.add_header(h1).unwrap();
1409
1410 let index = Arc::new(RwLock::new(index));
1411 let mut manager = SyncManager::new(index);
1412
1413 let mut in_flight = HashSet::new();
1415 in_flight.insert(h1_hash);
1416 manager.blocks_in_flight.insert(h1_hash);
1417
1418 manager.peer_states.insert(
1419 1,
1420 PeerSyncState {
1421 info: PeerInfo {
1422 id: 1,
1423 addr: "127.0.0.1:8333".parse().unwrap(),
1424 services: 1,
1425 version: 70016,
1426 subver: "/test/".to_string(),
1427 start_height: 1,
1428 relay_txs: true,
1429 },
1430 handshake: HandshakeState::Complete,
1431 headers_sync_pending: false,
1432 blocks_in_flight: in_flight,
1433 last_header_received: None,
1434 },
1435 );
1436
1437 manager.on_peer_disconnected(1);
1439
1440 assert!(manager.blocks_in_flight.is_empty());
1441 assert_eq!(manager.blocks_to_download.len(), 1);
1442 assert_eq!(manager.blocks_to_download[0], h1_hash);
1443 }
1444
1445 fn build_chain(count: u32) -> (BlockIndex, Vec<BlockHash>) {
1446 let mut index = BlockIndex::new();
1447 let genesis = make_header(BlockHash::zero(), 0);
1448 let genesis_hash = genesis.block_hash();
1449 index.init_genesis(genesis);
1450
1451 let mut hashes = vec![genesis_hash];
1452 let mut prev = genesis_hash;
1453 for i in 1..=count {
1454 let h = make_header(prev, i);
1455 let (hash, _) = index.add_header(h).unwrap();
1456 hashes.push(hash);
1457 prev = hash;
1458 }
1459 (index, hashes)
1460 }
1461
1462 fn make_peer_state(id: u64) -> (u64, PeerSyncState) {
1463 (
1464 id,
1465 PeerSyncState {
1466 info: PeerInfo {
1467 id,
1468 addr: "127.0.0.1:8333".parse().unwrap(),
1469 services: 1,
1470 version: 70016,
1471 subver: "/test/".to_string(),
1472 start_height: 100,
1473 relay_txs: true,
1474 },
1475 handshake: HandshakeState::Complete,
1476 headers_sync_pending: false,
1477 blocks_in_flight: HashSet::new(),
1478 last_header_received: None,
1479 },
1480 )
1481 }
1482
1483 #[tokio::test]
1484 async fn test_on_getblocks_responds_with_inv() {
1485 let (index, hashes) = build_chain(5);
1486 let genesis_hash = hashes[0];
1487 let index = Arc::new(RwLock::new(index));
1488 let mut manager = SyncManager::new(index);
1489
1490 let actions = manager
1492 .on_getblocks(1, vec![genesis_hash], BlockHash::zero())
1493 .await
1494 .unwrap();
1495
1496 assert_eq!(actions.len(), 1);
1497 match &actions[0] {
1498 SyncAction::SendMessage(peer_id, NetworkMessage::Inv { items }) => {
1499 assert_eq!(*peer_id, 1);
1500 assert_eq!(items.len(), 5); match &items[0] {
1503 InventoryItem::Block(h) => assert_eq!(*h, hashes[1]),
1504 _ => panic!("Expected block inv item"),
1505 }
1506 }
1507 other => panic!("Expected SendMessage(Inv), got {:?}", other),
1508 }
1509 }
1510
1511 #[tokio::test]
1512 async fn test_on_getblocks_with_hash_stop() {
1513 let (index, hashes) = build_chain(5);
1514 let genesis_hash = hashes[0];
1515 let index = Arc::new(RwLock::new(index));
1516 let mut manager = SyncManager::new(index);
1517
1518 let actions = manager
1520 .on_getblocks(1, vec![genesis_hash], hashes[3])
1521 .await
1522 .unwrap();
1523
1524 assert_eq!(actions.len(), 1);
1525 match &actions[0] {
1526 SyncAction::SendMessage(_, NetworkMessage::Inv { items }) => {
1527 assert_eq!(items.len(), 3); }
1529 other => panic!("Expected SendMessage(Inv), got {:?}", other),
1530 }
1531 }
1532
1533 #[tokio::test]
1534 async fn test_on_getblocks_empty_when_caught_up() {
1535 let (index, hashes) = build_chain(3);
1536 let tip = *hashes.last().unwrap();
1537 let index = Arc::new(RwLock::new(index));
1538 let mut manager = SyncManager::new(index);
1539
1540 let actions = manager
1542 .on_getblocks(1, vec![tip], BlockHash::zero())
1543 .await
1544 .unwrap();
1545
1546 assert!(actions.is_empty());
1547 }
1548
1549 #[tokio::test]
1550 async fn test_on_getblocks_from_middle() {
1551 let (index, hashes) = build_chain(5);
1552 let index = Arc::new(RwLock::new(index));
1553 let mut manager = SyncManager::new(index);
1554
1555 let actions = manager
1557 .on_getblocks(1, vec![hashes[2]], BlockHash::zero())
1558 .await
1559 .unwrap();
1560
1561 assert_eq!(actions.len(), 1);
1562 match &actions[0] {
1563 SyncAction::SendMessage(_, NetworkMessage::Inv { items }) => {
1564 assert_eq!(items.len(), 3); match &items[0] {
1566 InventoryItem::Block(h) => assert_eq!(*h, hashes[3]),
1567 _ => panic!("Expected block inv item"),
1568 }
1569 }
1570 other => panic!("Expected SendMessage(Inv), got {:?}", other),
1571 }
1572 }
1573
1574 #[tokio::test]
1575 async fn test_on_getdata_serves_known_block() {
1576 let (index, hashes) = build_chain(3);
1577 let index = Arc::new(RwLock::new(index));
1578 let mut manager = SyncManager::new(index);
1579
1580 let block_store = MockBlockStore {
1582 blocks: {
1583 let mut m = HashMap::new();
1584 let block = Block {
1586 header: make_header(hashes[0], 1),
1587 transactions: Vec::new(),
1588 };
1589 m.insert(block.block_hash(), block);
1590 m
1591 },
1592 };
1593
1594 let actions = manager
1595 .on_getdata(1, vec![InventoryItem::Block(hashes[1])], &block_store)
1596 .await
1597 .unwrap();
1598
1599 assert_eq!(actions.len(), 1);
1600 match &actions[0] {
1601 SyncAction::SendMessage(peer_id, NetworkMessage::Block { block }) => {
1602 assert_eq!(*peer_id, 1);
1603 assert_eq!(block.block_hash(), hashes[1]);
1604 }
1605 other => panic!("Expected SendMessage(Block), got {:?}", other),
1606 }
1607 }
1608
1609 #[tokio::test]
1610 async fn test_on_getdata_unknown_block_skipped() {
1611 let (index, _hashes) = build_chain(1);
1612 let index = Arc::new(RwLock::new(index));
1613 let mut manager = SyncManager::new(index);
1614
1615 let block_store = MockBlockStore {
1616 blocks: HashMap::new(), };
1618
1619 let unknown_hash = BlockHash::from_hash(Hash256::from_bytes([0xFF; 32]));
1620 let actions = manager
1621 .on_getdata(1, vec![InventoryItem::Block(unknown_hash)], &block_store)
1622 .await
1623 .unwrap();
1624
1625 assert!(actions.is_empty());
1626 }
1627
1628 #[tokio::test]
1629 async fn test_announce_block_to_peers() {
1630 let (index, _hashes) = build_chain(1);
1631 let index = Arc::new(RwLock::new(index));
1632 let manager = SyncManager::new(index);
1633
1634 let stub = abtc_adapters::network::StubPeerManager::new();
1636 let block_hash = BlockHash::from_hash(Hash256::from_bytes([0x42; 32]));
1637
1638 manager
1640 .announce_block(block_hash, None, &stub)
1641 .await
1642 .unwrap();
1643 }
1644
1645 #[tokio::test]
1646 async fn test_announce_block_skips_sender() {
1647 let (index, _hashes) = build_chain(1);
1648 let index = Arc::new(RwLock::new(index));
1649 let mut manager = SyncManager::new(index);
1650
1651 let (id1, state1) = make_peer_state(1);
1653 let (id2, state2) = make_peer_state(2);
1654 manager.peer_states.insert(id1, state1);
1655 manager.peer_states.insert(id2, state2);
1656
1657 let stub = abtc_adapters::network::StubPeerManager::new();
1658 let block_hash = BlockHash::from_hash(Hash256::from_bytes([0x42; 32]));
1659
1660 manager
1664 .announce_block(block_hash, Some(1), &stub)
1665 .await
1666 .unwrap();
1667 }
1668
1669 struct MockBlockStore {
1671 blocks: HashMap<BlockHash, Block>,
1672 }
1673
1674 #[async_trait::async_trait]
1675 impl BlockStore for MockBlockStore {
1676 async fn store_block(
1677 &self,
1678 _block: &Block,
1679 _height: u32,
1680 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1681 Ok(())
1682 }
1683
1684 async fn get_block(
1685 &self,
1686 hash: &BlockHash,
1687 ) -> Result<Option<Block>, Box<dyn std::error::Error + Send + Sync>> {
1688 Ok(self.blocks.get(hash).cloned())
1689 }
1690
1691 async fn get_block_header(
1692 &self,
1693 _hash: &BlockHash,
1694 ) -> Result<Option<BlockHeader>, Box<dyn std::error::Error + Send + Sync>> {
1695 Ok(None)
1696 }
1697
1698 async fn has_block(
1699 &self,
1700 hash: &BlockHash,
1701 ) -> Result<bool, Box<dyn std::error::Error + Send + Sync>> {
1702 Ok(self.blocks.contains_key(hash))
1703 }
1704
1705 async fn get_best_block_hash(
1706 &self,
1707 ) -> Result<BlockHash, Box<dyn std::error::Error + Send + Sync>> {
1708 Ok(BlockHash::zero())
1709 }
1710
1711 async fn get_block_height(
1712 &self,
1713 _hash: &BlockHash,
1714 ) -> Result<Option<u32>, Box<dyn std::error::Error + Send + Sync>> {
1715 Ok(None)
1716 }
1717 }
1718
1719 struct MockMempool {
1723 txs: HashMap<abtc_domain::primitives::Txid, abtc_ports::MempoolEntry>,
1724 }
1725
1726 #[async_trait::async_trait]
1727 impl MempoolPort for MockMempool {
1728 async fn add_transaction(
1729 &self,
1730 _tx: &Transaction,
1731 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1732 Ok(())
1733 }
1734 async fn remove_transaction(
1735 &self,
1736 _txid: &abtc_domain::primitives::Txid,
1737 _recursive: bool,
1738 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1739 Ok(())
1740 }
1741 async fn get_transaction(
1742 &self,
1743 txid: &abtc_domain::primitives::Txid,
1744 ) -> Result<Option<abtc_ports::MempoolEntry>, Box<dyn std::error::Error + Send + Sync>>
1745 {
1746 Ok(self.txs.get(txid).cloned())
1747 }
1748 async fn get_all_transactions(
1749 &self,
1750 ) -> Result<Vec<abtc_ports::MempoolEntry>, Box<dyn std::error::Error + Send + Sync>>
1751 {
1752 Ok(self.txs.values().cloned().collect())
1753 }
1754 async fn get_transaction_count(
1755 &self,
1756 ) -> Result<u32, Box<dyn std::error::Error + Send + Sync>> {
1757 Ok(self.txs.len() as u32)
1758 }
1759 async fn estimate_fee(
1760 &self,
1761 _target_blocks: u32,
1762 ) -> Result<f64, Box<dyn std::error::Error + Send + Sync>> {
1763 Ok(1.0)
1764 }
1765 async fn get_mempool_info(
1766 &self,
1767 ) -> Result<abtc_ports::MempoolInfo, Box<dyn std::error::Error + Send + Sync>> {
1768 Ok(abtc_ports::MempoolInfo {
1769 size: self.txs.len() as u32,
1770 bytes: 0,
1771 usage: 0,
1772 max_mempool: 300_000_000,
1773 min_relay_fee: 0.00001,
1774 })
1775 }
1776 async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
1777 Ok(())
1778 }
1779 }
1780
1781 fn make_test_tx(value: i64) -> Transaction {
1782 use abtc_domain::primitives::{Amount, OutPoint, TxIn, TxOut, Txid};
1783 use abtc_domain::script::Script;
1784 let input = TxIn::final_input(OutPoint::new(Txid::zero(), 0), Script::new());
1785 let output = TxOut::new(Amount::from_sat(value), Script::new());
1786 Transaction::v1(vec![input], vec![output], 0)
1787 }
1788
1789 #[tokio::test]
1790 async fn test_announce_transaction_to_peers() {
1791 let (index, _hashes) = build_chain(1);
1792 let index = Arc::new(RwLock::new(index));
1793 let manager = SyncManager::new(index);
1794
1795 let stub = abtc_adapters::network::StubPeerManager::new();
1796 let txid = abtc_domain::primitives::Txid::zero();
1797
1798 manager
1800 .announce_transaction(txid, None, &stub)
1801 .await
1802 .unwrap();
1803 }
1804
1805 #[tokio::test]
1806 async fn test_announce_transaction_skips_sender() {
1807 let (index, _hashes) = build_chain(1);
1808 let index = Arc::new(RwLock::new(index));
1809 let mut manager = SyncManager::new(index);
1810
1811 let (id1, state1) = make_peer_state(1);
1812 let (id2, state2) = make_peer_state(2);
1813 manager.peer_states.insert(id1, state1);
1814 manager.peer_states.insert(id2, state2);
1815
1816 let stub = abtc_adapters::network::StubPeerManager::new();
1817 let txid = abtc_domain::primitives::Txid::zero();
1818
1819 manager
1821 .announce_transaction(txid, Some(1), &stub)
1822 .await
1823 .unwrap();
1824 }
1825
1826 #[tokio::test]
1827 async fn test_on_getdata_serves_transaction() {
1828 let (index, _hashes) = build_chain(1);
1829 let index = Arc::new(RwLock::new(index));
1830 let mut manager = SyncManager::new(index);
1831
1832 let tx = make_test_tx(5000);
1833 let txid = tx.txid();
1834
1835 let entry = abtc_ports::MempoolEntry {
1836 tx: tx.clone(),
1837 fee: abtc_domain::primitives::Amount::from_sat(100),
1838 size: 200,
1839 time: 0,
1840 height: 0,
1841 descendant_count: 0,
1842 descendant_size: 0,
1843 ancestor_count: 0,
1844 ancestor_size: 0,
1845 };
1846
1847 let mut txs = HashMap::new();
1848 txs.insert(txid, entry);
1849 let mempool = MockMempool { txs };
1850 let block_store = MockBlockStore {
1851 blocks: HashMap::new(),
1852 };
1853
1854 let actions = manager
1855 .on_getdata_with_mempool(1, vec![InventoryItem::Tx(txid)], &block_store, &mempool)
1856 .await
1857 .unwrap();
1858
1859 assert_eq!(actions.len(), 1);
1860 match &actions[0] {
1861 SyncAction::SendMessage(peer_id, NetworkMessage::Tx { tx: served_tx }) => {
1862 assert_eq!(*peer_id, 1);
1863 assert_eq!(served_tx.txid(), txid);
1864 }
1865 other => panic!("Expected SendMessage(Tx), got {:?}", other),
1866 }
1867 }
1868
1869 #[tokio::test]
1870 async fn test_on_getdata_unknown_tx_skipped() {
1871 let (index, _hashes) = build_chain(1);
1872 let index = Arc::new(RwLock::new(index));
1873 let mut manager = SyncManager::new(index);
1874
1875 let mempool = MockMempool {
1876 txs: HashMap::new(),
1877 };
1878 let block_store = MockBlockStore {
1879 blocks: HashMap::new(),
1880 };
1881
1882 let unknown_txid = abtc_domain::primitives::Txid::zero();
1883 let actions = manager
1884 .on_getdata_with_mempool(
1885 1,
1886 vec![InventoryItem::Tx(unknown_txid)],
1887 &block_store,
1888 &mempool,
1889 )
1890 .await
1891 .unwrap();
1892
1893 assert!(actions.is_empty());
1894 }
1895
1896 #[tokio::test]
1897 async fn test_on_inv_requests_unknown_tx() {
1898 let (index, _hashes) = build_chain(1);
1899 let index = Arc::new(RwLock::new(index));
1900 let mut manager = SyncManager::new(index);
1901
1902 let stub = abtc_adapters::network::StubPeerManager::new();
1903 let txid = abtc_domain::primitives::Txid::from_hash(Hash256::from_bytes([0xAB; 32]));
1904
1905 let actions = manager
1906 .on_inv(1, vec![InventoryItem::Tx(txid)], &stub)
1907 .await
1908 .unwrap();
1909
1910 assert!(!actions.is_empty());
1912 }
1913
1914 #[tokio::test]
1915 async fn test_on_inv_deduplicates_seen_tx() {
1916 let (index, _hashes) = build_chain(1);
1917 let index = Arc::new(RwLock::new(index));
1918 let mut manager = SyncManager::new(index);
1919
1920 let stub = abtc_adapters::network::StubPeerManager::new();
1921 let txid = abtc_domain::primitives::Txid::from_hash(Hash256::from_bytes([0xCD; 32]));
1922
1923 let actions1 = manager
1925 .on_inv(1, vec![InventoryItem::Tx(txid)], &stub)
1926 .await
1927 .unwrap();
1928 assert!(!actions1.is_empty());
1929
1930 let actions2 = manager
1932 .on_inv(1, vec![InventoryItem::Tx(txid)], &stub)
1933 .await
1934 .unwrap();
1935 let has_tx_request = actions2.iter().any(|a| {
1937 matches!(a, SyncAction::SendMessage(_, NetworkMessage::GetData { items })
1938 if items.iter().any(|i| matches!(i, InventoryItem::Tx(t) if *t == txid)))
1939 });
1940 assert!(!has_tx_request, "Should not re-request already-seen txid");
1941 }
1942
1943 #[tokio::test]
1946 async fn test_handshake_sends_version_on_connect() {
1947 let (index, _) = build_chain(1);
1948 let index = Arc::new(RwLock::new(index));
1949 let mut manager = SyncManager::new(index);
1950
1951 let stub = abtc_adapters::network::StubPeerManager::new();
1952 let peer_info = PeerInfo {
1953 id: 1,
1954 addr: "127.0.0.1:8333".parse().unwrap(),
1955 services: 1,
1956 version: 70016,
1957 subver: "/test/".to_string(),
1958 start_height: 0,
1959 relay_txs: true,
1960 };
1961
1962 let actions = manager.on_peer_connected(peer_info, &stub).await.unwrap();
1963
1964 assert_eq!(actions.len(), 1);
1966 assert!(matches!(
1967 &actions[0],
1968 SyncAction::SendMessage(1, NetworkMessage::Version { .. })
1969 ));
1970
1971 assert_eq!(
1973 manager.peer_states.get(&1).unwrap().handshake,
1974 HandshakeState::AwaitingVersion
1975 );
1976 }
1977
1978 #[tokio::test]
1979 async fn test_handshake_version_then_verack() {
1980 let (index, _) = build_chain(1);
1981 let index = Arc::new(RwLock::new(index));
1982 let mut manager = SyncManager::new(index);
1983
1984 let stub = abtc_adapters::network::StubPeerManager::new();
1985 let peer_info = PeerInfo {
1986 id: 1,
1987 addr: "127.0.0.1:8333".parse().unwrap(),
1988 services: 1,
1989 version: 70016,
1990 subver: "/test/".to_string(),
1991 start_height: 100,
1992 relay_txs: true,
1993 };
1994
1995 let _ = manager.on_peer_connected(peer_info, &stub).await.unwrap();
1997 assert_eq!(
1998 manager.peer_states.get(&1).unwrap().handshake,
1999 HandshakeState::AwaitingVersion
2000 );
2001
2002 let actions = manager
2004 .on_version(1, 70016, 1, "/remote/".to_string(), 200, true, &stub)
2005 .await
2006 .unwrap();
2007
2008 assert_eq!(actions.len(), 1);
2009 assert!(matches!(
2010 &actions[0],
2011 SyncAction::SendMessage(1, NetworkMessage::Verack)
2012 ));
2013 assert_eq!(
2014 manager.peer_states.get(&1).unwrap().handshake,
2015 HandshakeState::AwaitingVerack
2016 );
2017
2018 assert_eq!(manager.peer_states.get(&1).unwrap().info.start_height, 200);
2020 assert_eq!(manager.peer_states.get(&1).unwrap().info.subver, "/remote/");
2021
2022 let _ = manager.on_verack(1, &stub).await.unwrap();
2024 assert_eq!(
2025 manager.peer_states.get(&1).unwrap().handshake,
2026 HandshakeState::Complete
2027 );
2028 assert_eq!(manager.state(), SyncState::HeaderSync);
2030 }
2031
2032 #[tokio::test]
2033 async fn test_handshake_rejects_old_version() {
2034 let (index, _) = build_chain(1);
2035 let index = Arc::new(RwLock::new(index));
2036 let mut manager = SyncManager::new(index);
2037
2038 let stub = abtc_adapters::network::StubPeerManager::new();
2039 let peer_info = PeerInfo {
2040 id: 1,
2041 addr: "127.0.0.1:8333".parse().unwrap(),
2042 services: 1,
2043 version: 70016,
2044 subver: "/test/".to_string(),
2045 start_height: 0,
2046 relay_txs: true,
2047 };
2048
2049 let _ = manager.on_peer_connected(peer_info, &stub).await.unwrap();
2050
2051 let actions = manager
2053 .on_version(1, 70000, 1, "/old/".to_string(), 100, true, &stub)
2054 .await
2055 .unwrap();
2056
2057 assert_eq!(actions.len(), 1);
2059 assert!(matches!(&actions[0], SyncAction::DisconnectPeer(1)));
2060 }
2061
2062 #[tokio::test]
2063 async fn test_ping_responds_with_pong() {
2064 let (index, _hashes) = build_chain(1);
2065 let index = Arc::new(RwLock::new(index));
2066 let mut manager = SyncManager::new(index);
2067
2068 let stub = abtc_adapters::network::StubPeerManager::new();
2069 let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2070 let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2071 let mempool = abtc_adapters::mempool::InMemoryMempool::new();
2072
2073 let peer_info = PeerInfo {
2075 id: 1,
2076 addr: "127.0.0.1:8333".parse().unwrap(),
2077 services: 1,
2078 version: 70016,
2079 subver: "/test/".to_string(),
2080 start_height: 0,
2081 relay_txs: true,
2082 };
2083 manager.peer_states.insert(
2084 1,
2085 PeerSyncState {
2086 info: peer_info,
2087 handshake: HandshakeState::Complete,
2088 headers_sync_pending: false,
2089 blocks_in_flight: HashSet::new(),
2090 last_header_received: None,
2091 },
2092 );
2093
2094 let actions = manager
2095 .on_message_received(
2096 1,
2097 NetworkMessage::Ping { nonce: 42 },
2098 &stub,
2099 &block_store,
2100 &chain_state_store,
2101 &mempool,
2102 )
2103 .await
2104 .unwrap();
2105
2106 assert_eq!(actions.len(), 1);
2107 assert!(matches!(
2108 &actions[0],
2109 SyncAction::SendMessage(1, NetworkMessage::Pong { nonce: 42 })
2110 ));
2111 }
2112
2113 #[tokio::test]
2114 async fn test_build_version_message() {
2115 let (index, _) = build_chain(1);
2116 let index = Arc::new(RwLock::new(index));
2117 let manager = SyncManager::new(index);
2118
2119 let addr: std::net::SocketAddr = "1.2.3.4:8333".parse().unwrap();
2120 let msg = manager.build_version_message(addr, 500);
2121
2122 match msg {
2123 NetworkMessage::Version {
2124 version,
2125 services,
2126 user_agent,
2127 start_height,
2128 relay,
2129 ..
2130 } => {
2131 assert_eq!(version, OUR_PROTOCOL_VERSION);
2132 assert_eq!(services, OUR_SERVICES);
2133 assert!(user_agent.contains("agentic-bitcoin"));
2134 assert_eq!(start_height, 500);
2135 assert!(relay);
2136 }
2137 _ => panic!("Expected Version message"),
2138 }
2139 }
2140
2141 #[tokio::test]
2144 async fn test_on_addr_stores_addresses() {
2145 let (index, _) = build_chain(1);
2146 let index = Arc::new(RwLock::new(index));
2147 let mut manager = SyncManager::new(index);
2148
2149 let now = std::time::SystemTime::now()
2150 .duration_since(std::time::UNIX_EPOCH)
2151 .unwrap_or_default()
2152 .as_secs() as u32;
2153
2154 let addrs = vec![
2155 (now, "1.2.3.4:8333".parse().unwrap()),
2156 (now, "5.6.7.8:8333".parse().unwrap()),
2157 ];
2158
2159 let actions = manager.on_addr(1, addrs).await.unwrap();
2160 assert!(actions.is_empty()); assert_eq!(manager.known_address_count(), 2);
2162 }
2163
2164 #[tokio::test]
2165 async fn test_on_addr_rejects_old_addresses() {
2166 let (index, _) = build_chain(1);
2167 let index = Arc::new(RwLock::new(index));
2168 let mut manager = SyncManager::new(index);
2169
2170 let old_ts = std::time::SystemTime::now()
2172 .duration_since(std::time::UNIX_EPOCH)
2173 .unwrap_or_default()
2174 .as_secs() as u32
2175 - 4 * 60 * 60;
2176
2177 let addrs = vec![(old_ts, "1.2.3.4:8333".parse().unwrap())];
2178 manager.on_addr(1, addrs).await.unwrap();
2179 assert_eq!(manager.known_address_count(), 0);
2180 }
2181
2182 #[tokio::test]
2183 async fn test_on_getaddr_responds_with_known_addresses() {
2184 let (index, _) = build_chain(1);
2185 let index = Arc::new(RwLock::new(index));
2186 let mut manager = SyncManager::new(index);
2187
2188 let now = std::time::SystemTime::now()
2189 .duration_since(std::time::UNIX_EPOCH)
2190 .unwrap_or_default()
2191 .as_secs() as u32;
2192
2193 manager
2195 .known_addresses
2196 .insert("1.2.3.4:8333".parse().unwrap(), (now, OUR_SERVICES));
2197 manager
2198 .known_addresses
2199 .insert("5.6.7.8:8333".parse().unwrap(), (now - 60, OUR_SERVICES));
2200
2201 let actions = manager.on_getaddr(1).await.unwrap();
2202 assert_eq!(actions.len(), 1);
2203
2204 match &actions[0] {
2205 SyncAction::SendMessage(peer_id, NetworkMessage::Addr { addresses }) => {
2206 assert_eq!(*peer_id, 1);
2207 assert_eq!(addresses.len(), 2);
2208 }
2209 other => panic!("Expected SendMessage(Addr), got {:?}", other),
2210 }
2211 }
2212
2213 #[tokio::test]
2214 async fn test_on_getaddr_empty_when_no_addresses() {
2215 let (index, _) = build_chain(1);
2216 let index = Arc::new(RwLock::new(index));
2217 let manager = SyncManager::new(index);
2218
2219 let actions = manager.on_getaddr(1).await.unwrap();
2220 assert!(actions.is_empty());
2221 }
2222
2223 #[tokio::test]
2224 async fn test_handshake_sends_getaddr_after_verack() {
2225 let (index, _) = build_chain(1);
2226 let index = Arc::new(RwLock::new(index));
2227 let mut manager = SyncManager::new(index);
2228
2229 let stub = abtc_adapters::network::StubPeerManager::new();
2230 let peer_info = PeerInfo {
2231 id: 1,
2232 addr: "127.0.0.1:8333".parse().unwrap(),
2233 services: 1,
2234 version: 70016,
2235 subver: "/test/".to_string(),
2236 start_height: 100,
2237 relay_txs: true,
2238 };
2239
2240 let _ = manager.on_peer_connected(peer_info, &stub).await.unwrap();
2242 let _ = manager
2243 .on_version(1, 70016, 1, "/remote/".to_string(), 200, true, &stub)
2244 .await
2245 .unwrap();
2246 let actions = manager.on_verack(1, &stub).await.unwrap();
2247
2248 let has_getaddr = actions
2250 .iter()
2251 .any(|a| matches!(a, SyncAction::SendMessage(1, NetworkMessage::GetAddr)));
2252 assert!(has_getaddr, "Handshake should send GetAddr after Verack");
2253
2254 assert_eq!(manager.known_address_count(), 1);
2256 }
2257
2258 #[tokio::test]
2261 async fn test_tx_message_accepted_to_mempool() {
2262 let (index, _hashes) = build_chain(1);
2263 let index = Arc::new(RwLock::new(index));
2264 let mut manager = SyncManager::new(index);
2265
2266 let stub = abtc_adapters::network::StubPeerManager::new();
2267 let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2268 let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2269 let mempool = abtc_adapters::mempool::InMemoryMempool::new();
2270
2271 let peer_info = PeerInfo {
2273 id: 1,
2274 addr: "127.0.0.1:8333".parse().unwrap(),
2275 services: 1,
2276 version: 70016,
2277 subver: "/test/".to_string(),
2278 start_height: 0,
2279 relay_txs: true,
2280 };
2281 manager.peer_states.insert(
2282 1,
2283 PeerSyncState {
2284 info: peer_info,
2285 handshake: HandshakeState::Complete,
2286 headers_sync_pending: false,
2287 blocks_in_flight: HashSet::new(),
2288 last_header_received: None,
2289 },
2290 );
2291
2292 let tx = make_test_tx(5000);
2293 let actions = manager
2294 .on_message_received(
2295 1,
2296 NetworkMessage::Tx { tx: tx.clone() },
2297 &stub,
2298 &block_store,
2299 &chain_state_store,
2300 &mempool,
2301 )
2302 .await
2303 .unwrap();
2304
2305 assert!(!actions.is_empty());
2307 let has_accepted = actions
2308 .iter()
2309 .any(|a| matches!(a, SyncAction::AcceptedTransaction { .. }));
2310 assert!(
2311 has_accepted,
2312 "Valid tx should produce AcceptedTransaction action"
2313 );
2314
2315 assert_eq!(mempool.get_transaction_count().await.unwrap(), 1);
2317 }
2318
2319 #[tokio::test]
2320 async fn test_coinbase_tx_rejected() {
2321 let (index, _hashes) = build_chain(1);
2322 let index = Arc::new(RwLock::new(index));
2323 let mut manager = SyncManager::new(index);
2324
2325 let stub = abtc_adapters::network::StubPeerManager::new();
2326 let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2327 let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2328 let mempool = abtc_adapters::mempool::InMemoryMempool::new();
2329
2330 manager.peer_states.insert(
2331 1,
2332 PeerSyncState {
2333 info: PeerInfo {
2334 id: 1,
2335 addr: "127.0.0.1:8333".parse().unwrap(),
2336 services: 1,
2337 version: 70016,
2338 subver: "/test/".to_string(),
2339 start_height: 0,
2340 relay_txs: true,
2341 },
2342 handshake: HandshakeState::Complete,
2343 headers_sync_pending: false,
2344 blocks_in_flight: HashSet::new(),
2345 last_header_received: None,
2346 },
2347 );
2348
2349 let coinbase = Transaction::coinbase(
2351 1,
2352 abtc_domain::script::Script::from_bytes(vec![0x01, 0x01]),
2353 vec![abtc_domain::primitives::TxOut::new(
2354 abtc_domain::primitives::Amount::from_sat(5_000_000_000),
2355 abtc_domain::script::Script::new(),
2356 )],
2357 );
2358
2359 let actions = manager
2360 .on_message_received(
2361 1,
2362 NetworkMessage::Tx { tx: coinbase },
2363 &stub,
2364 &block_store,
2365 &chain_state_store,
2366 &mempool,
2367 )
2368 .await
2369 .unwrap();
2370
2371 let has_tx_action = actions.iter().any(|a| {
2373 matches!(
2374 a,
2375 SyncAction::AcceptedTransaction { .. } | SyncAction::ProcessTransaction(_)
2376 )
2377 });
2378 assert!(!has_tx_action, "Coinbase tx should be rejected from P2P");
2379 }
2380
2381 #[tokio::test]
2384 async fn test_rejected_tx_goes_to_orphan_pool() {
2385 let (index, _hashes) = build_chain(1);
2386 let index = Arc::new(RwLock::new(index));
2387 let mut manager = SyncManager::new(index);
2388
2389 let stub = abtc_adapters::network::StubPeerManager::new();
2390 let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2391 let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2392
2393 let mempool = RejectingMempool;
2395
2396 manager.peer_states.insert(
2397 1,
2398 PeerSyncState {
2399 info: PeerInfo {
2400 id: 1,
2401 addr: "127.0.0.1:8333".parse().unwrap(),
2402 services: 1,
2403 version: 70016,
2404 subver: "/test/".to_string(),
2405 start_height: 0,
2406 relay_txs: true,
2407 },
2408 handshake: HandshakeState::Complete,
2409 headers_sync_pending: false,
2410 blocks_in_flight: HashSet::new(),
2411 last_header_received: None,
2412 },
2413 );
2414
2415 let tx = make_test_tx(5000);
2416 let _ = manager
2417 .on_message_received(
2418 1,
2419 NetworkMessage::Tx { tx },
2420 &stub,
2421 &block_store,
2422 &chain_state_store,
2423 &mempool,
2424 )
2425 .await
2426 .unwrap();
2427
2428 assert_eq!(manager.orphan_tx_count(), 1);
2430 }
2431
2432 #[tokio::test]
2433 async fn test_peer_disconnect_clears_orphans() {
2434 let (index, _hashes) = build_chain(1);
2435 let index = Arc::new(RwLock::new(index));
2436 let mut manager = SyncManager::new(index);
2437
2438 let stub = abtc_adapters::network::StubPeerManager::new();
2439 let block_store = abtc_adapters::storage::InMemoryBlockStore::new();
2440 let chain_state_store = abtc_adapters::storage::InMemoryChainStateStore::new();
2441 let mempool = RejectingMempool;
2442
2443 manager.peer_states.insert(
2444 1,
2445 PeerSyncState {
2446 info: PeerInfo {
2447 id: 1,
2448 addr: "127.0.0.1:8333".parse().unwrap(),
2449 services: 1,
2450 version: 70016,
2451 subver: "/test/".to_string(),
2452 start_height: 0,
2453 relay_txs: true,
2454 },
2455 handshake: HandshakeState::Complete,
2456 headers_sync_pending: false,
2457 blocks_in_flight: HashSet::new(),
2458 last_header_received: None,
2459 },
2460 );
2461
2462 let tx1 = make_test_tx(5000);
2464 let tx2 = make_test_tx(6000);
2465 let _ = manager
2466 .on_message_received(
2467 1,
2468 NetworkMessage::Tx { tx: tx1 },
2469 &stub,
2470 &block_store,
2471 &chain_state_store,
2472 &mempool,
2473 )
2474 .await
2475 .unwrap();
2476 let _ = manager
2477 .on_message_received(
2478 1,
2479 NetworkMessage::Tx { tx: tx2 },
2480 &stub,
2481 &block_store,
2482 &chain_state_store,
2483 &mempool,
2484 )
2485 .await
2486 .unwrap();
2487 assert_eq!(manager.orphan_tx_count(), 2);
2488
2489 manager.on_peer_disconnected(1);
2491 assert_eq!(manager.orphan_tx_count(), 0);
2492 }
2493
2494 struct RejectingMempool;
2496
2497 #[async_trait::async_trait]
2498 impl MempoolPort for RejectingMempool {
2499 async fn add_transaction(
2500 &self,
2501 _tx: &Transaction,
2502 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
2503 Err("missing inputs".into())
2504 }
2505 async fn remove_transaction(
2506 &self,
2507 _txid: &abtc_domain::primitives::Txid,
2508 _recursive: bool,
2509 ) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
2510 Ok(())
2511 }
2512 async fn get_transaction(
2513 &self,
2514 _txid: &abtc_domain::primitives::Txid,
2515 ) -> Result<Option<abtc_ports::MempoolEntry>, Box<dyn std::error::Error + Send + Sync>>
2516 {
2517 Ok(None)
2518 }
2519 async fn get_all_transactions(
2520 &self,
2521 ) -> Result<Vec<abtc_ports::MempoolEntry>, Box<dyn std::error::Error + Send + Sync>>
2522 {
2523 Ok(Vec::new())
2524 }
2525 async fn get_transaction_count(
2526 &self,
2527 ) -> Result<u32, Box<dyn std::error::Error + Send + Sync>> {
2528 Ok(0)
2529 }
2530 async fn estimate_fee(
2531 &self,
2532 _target_blocks: u32,
2533 ) -> Result<f64, Box<dyn std::error::Error + Send + Sync>> {
2534 Ok(1.0)
2535 }
2536 async fn get_mempool_info(
2537 &self,
2538 ) -> Result<abtc_ports::MempoolInfo, Box<dyn std::error::Error + Send + Sync>> {
2539 Ok(abtc_ports::MempoolInfo {
2540 size: 0,
2541 bytes: 0,
2542 usage: 0,
2543 max_mempool: 300_000_000,
2544 min_relay_fee: 0.00001,
2545 })
2546 }
2547 async fn clear(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
2548 Ok(())
2549 }
2550 }
2551}