1#![allow(clippy::manual_range_contains)]
3use std::collections::HashSet;
4use std::sync::Arc;
5use std::{fmt, io, net};
6
7use nakamoto_common::bitcoin::network::constants::ServiceFlags;
8use nakamoto_common::bitcoin::{Transaction, Txid};
9use nakamoto_common::block::{Block, BlockHash, BlockHeader, Height};
10use nakamoto_net::event::Emitter;
11use nakamoto_net::Disconnect;
12use nakamoto_p2p::fsm;
13use nakamoto_p2p::fsm::fees::FeeEstimate;
14use nakamoto_p2p::fsm::{Link, PeerId};
15
16#[derive(Clone, Debug)]
18pub enum Loading {
19 BlockHeaderLoaded {
22 height: Height,
24 },
25 FilterHeaderLoaded {
28 height: Height,
30 },
31 FilterHeaderVerified {
34 height: Height,
36 },
37}
38
39impl fmt::Display for Loading {
40 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
41 match self {
42 Self::BlockHeaderLoaded { height } => {
43 write!(fmt, "block header #{} loaded", height)
44 }
45 Self::FilterHeaderLoaded { height } => {
46 write!(fmt, "filter header #{} loaded", height)
47 }
48 Self::FilterHeaderVerified { height } => {
49 write!(fmt, "filter header #{} verified", height)
50 }
51 }
52 }
53}
54
55#[derive(Debug, Clone)]
57pub enum Event {
58 Ready {
61 tip: Height,
63 filter_tip: Height,
65 },
66 PeerConnected {
70 addr: PeerId,
72 link: Link,
74 },
75 PeerDisconnected {
77 addr: PeerId,
79 reason: Disconnect<fsm::DisconnectReason>,
81 },
82 PeerConnectionFailed {
84 addr: PeerId,
86 error: Arc<io::Error>,
88 },
89 PeerNegotiated {
91 addr: PeerId,
93 link: Link,
95 services: ServiceFlags,
97 height: Height,
99 user_agent: String,
101 version: u32,
103 },
104 PeerHeightUpdated {
109 height: Height,
111 },
112 BlockConnected {
114 header: BlockHeader,
116 hash: BlockHash,
118 height: Height,
120 },
121 BlockDisconnected {
125 header: BlockHeader,
127 hash: BlockHash,
129 height: Height,
131 },
132 BlockMatched {
135 hash: BlockHash,
137 header: BlockHeader,
139 height: Height,
141 transactions: Vec<Transaction>,
143 },
144 FeeEstimated {
146 block: BlockHash,
148 height: Height,
150 fees: FeeEstimate,
152 },
153 FilterProcessed {
157 block: BlockHash,
159 height: Height,
161 matched: bool,
163 valid: bool,
165 },
166 TxStatusChanged {
168 txid: Txid,
170 status: TxStatus,
172 },
173 Synced {
179 height: Height,
181 tip: Height,
183 },
184}
185
186impl fmt::Display for Event {
187 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
188 match self {
189 Self::Ready { .. } => {
190 write!(fmt, "ready to process events and commands")
191 }
192 Self::BlockConnected { hash, height, .. } => {
193 write!(fmt, "block {} connected at height {}", hash, height)
194 }
195 Self::BlockDisconnected { hash, height, .. } => {
196 write!(fmt, "block {} disconnected at height {}", hash, height)
197 }
198 Self::BlockMatched { hash, height, .. } => {
199 write!(
200 fmt,
201 "block {} ready to be processed at height {}",
202 hash, height
203 )
204 }
205 Self::FeeEstimated { fees, height, .. } => {
206 write!(
207 fmt,
208 "transaction median fee rate for block #{} is {} sat/vB",
209 height, fees.median,
210 )
211 }
212 Self::FilterProcessed {
213 height, matched, ..
214 } => {
215 write!(
216 fmt,
217 "filter processed at height {} (match = {})",
218 height, matched
219 )
220 }
221 Self::TxStatusChanged { txid, status } => {
222 write!(fmt, "transaction {} status changed: {}", txid, status)
223 }
224 Self::Synced { height, .. } => write!(fmt, "filters synced up to height {}", height),
225 Self::PeerConnected { addr, link } => {
226 write!(fmt, "peer {} connected ({:?})", &addr, link)
227 }
228 Self::PeerConnectionFailed { addr, error } => {
229 write!(
230 fmt,
231 "peer connection attempt to {} failed with {}",
232 &addr, error
233 )
234 }
235 Self::PeerHeightUpdated { height } => {
236 write!(fmt, "peer height updated to {}", height)
237 }
238 Self::PeerDisconnected { addr, reason } => {
239 write!(fmt, "disconnected from {} ({})", &addr, reason)
240 }
241 Self::PeerNegotiated {
242 addr,
243 height,
244 services,
245 ..
246 } => write!(
247 fmt,
248 "peer {} negotiated with services {} and height {}..",
249 addr, services, height
250 ),
251 }
252 }
253}
254
255#[derive(Debug, Clone, PartialOrd, Ord, PartialEq, Eq)]
257pub enum TxStatus {
258 Unconfirmed,
261 Acknowledged {
266 peer: net::SocketAddr,
268 },
269 Confirmed {
272 height: Height,
274 block: BlockHash,
276 },
277 Reverted,
281 Stale {
286 replaced_by: Txid,
288 block: BlockHash,
290 },
291}
292
293impl fmt::Display for TxStatus {
294 fn fmt(&self, fmt: &mut fmt::Formatter<'_>) -> fmt::Result {
295 match self {
296 Self::Unconfirmed => write!(fmt, "transaction is unconfirmed"),
297 Self::Acknowledged { peer } => {
298 write!(fmt, "transaction was acknowledged by peer {}", peer)
299 }
300 Self::Confirmed { height, block } => write!(
301 fmt,
302 "transaction was included in block {} at height {}",
303 block, height
304 ),
305 Self::Reverted => write!(fmt, "transaction has been reverted"),
306 Self::Stale { replaced_by, block } => write!(
307 fmt,
308 "transaction was replaced by {} in block {}",
309 replaced_by, block
310 ),
311 }
312 }
313}
314
315pub(crate) struct Mapper {
318 tip: Height,
320 sync_height: Height,
322 filter_height: Height,
325 block_height: Height,
328 pending: HashSet<Height>,
330}
331
332impl Default for Mapper {
333 fn default() -> Self {
335 let tip = 0;
336 let sync_height = 0;
337 let filter_height = 0;
338 let block_height = 0;
339 let pending = HashSet::new();
340
341 Self {
342 tip,
343 sync_height,
344 filter_height,
345 block_height,
346 pending,
347 }
348 }
349}
350
351impl Mapper {
352 pub fn process(&mut self, event: fsm::Event, emitter: &Emitter<Event>) {
354 match event {
355 fsm::Event::Ready {
356 height,
357 filter_height,
358 ..
359 } => {
360 emitter.emit(Event::Ready {
361 tip: height,
362 filter_tip: filter_height,
363 });
364 }
365 fsm::Event::Peer(fsm::PeerEvent::Connected(addr, link)) => {
366 emitter.emit(Event::PeerConnected { addr, link });
367 }
368 fsm::Event::Peer(fsm::PeerEvent::ConnectionFailed(addr, error)) => {
369 emitter.emit(Event::PeerConnectionFailed { addr, error });
370 }
371 fsm::Event::Peer(fsm::PeerEvent::Negotiated {
372 addr,
373 link,
374 services,
375 user_agent,
376 height,
377 version,
378 }) => {
379 emitter.emit(Event::PeerNegotiated {
380 addr,
381 link,
382 services,
383 user_agent,
384 height,
385 version,
386 });
387 }
388 fsm::Event::Peer(fsm::PeerEvent::Disconnected(addr, reason)) => {
389 emitter.emit(Event::PeerDisconnected { addr, reason });
390 }
391 fsm::Event::Chain(fsm::ChainEvent::PeerHeightUpdated { height }) => {
392 emitter.emit(Event::PeerHeightUpdated { height });
393 }
394 fsm::Event::Chain(fsm::ChainEvent::Synced(_, height)) => {
395 self.tip = height;
396 }
397 fsm::Event::Chain(fsm::ChainEvent::BlockConnected { header, height }) => {
398 emitter.emit(Event::BlockConnected {
399 header,
400 hash: header.block_hash(),
401 height,
402 });
403 }
404 fsm::Event::Chain(fsm::ChainEvent::BlockDisconnected { header, height }) => {
405 emitter.emit(Event::BlockDisconnected {
406 header,
407 hash: header.block_hash(),
408 height,
409 });
410 }
411 fsm::Event::Inventory(fsm::InventoryEvent::BlockProcessed {
412 block,
413 height,
414 fees,
415 }) => {
416 let hash = self.process_block(block, height, emitter);
417
418 if let Some(fees) = fees {
419 emitter.emit(Event::FeeEstimated {
420 block: hash,
421 height,
422 fees,
423 });
424 }
425 }
426 fsm::Event::Inventory(fsm::InventoryEvent::Confirmed {
427 transaction,
428 height,
429 block,
430 }) => {
431 emitter.emit(Event::TxStatusChanged {
432 txid: transaction.txid(),
433 status: TxStatus::Confirmed { height, block },
434 });
435 }
436 fsm::Event::Inventory(fsm::InventoryEvent::Acknowledged { txid, peer }) => {
437 emitter.emit(Event::TxStatusChanged {
438 txid,
439 status: TxStatus::Acknowledged { peer },
440 });
441 }
442 fsm::Event::Filter(fsm::FilterEvent::RescanStarted { start, .. }) => {
443 self.pending.clear();
444
445 self.filter_height = start;
446 self.sync_height = start;
447 self.block_height = start;
448 }
449 fsm::Event::Filter(fsm::FilterEvent::FilterProcessed {
450 block,
451 height,
452 matched,
453 valid,
454 ..
455 }) => {
456 self.process_filter(block, height, matched, valid, emitter);
457 }
458 _ => {}
459 }
460 assert!(
461 self.block_height <= self.filter_height,
462 "Filters are processed before blocks"
463 );
464 assert!(
465 self.sync_height <= self.filter_height,
466 "Filters are processed before we are done"
467 );
468
469 let height = if self.pending.is_empty() {
472 self.filter_height
473 } else {
474 self.block_height
475 };
476
477 if height > self.sync_height {
479 self.sync_height = height;
480
481 emitter.emit(Event::Synced {
482 height,
483 tip: self.tip,
484 });
485 }
486 }
487
488 fn process_block(
492 &mut self,
493 block: Block,
494 height: Height,
495 emitter: &Emitter<Event>,
496 ) -> BlockHash {
497 let hash = block.block_hash();
498
499 if !self.pending.remove(&height) {
500 return hash;
502 }
503
504 log::debug!("Received block {} at height {}", hash, height);
505 debug_assert!(height >= self.block_height);
506
507 self.block_height = height;
508
509 emitter.emit(Event::BlockMatched {
510 height,
511 hash,
512 header: block.header,
513 transactions: block.txdata,
514 });
515
516 hash
517 }
518
519 fn process_filter(
520 &mut self,
521 block: BlockHash,
522 height: Height,
523 matched: bool,
524 valid: bool,
525 emitter: &Emitter<Event>,
526 ) {
527 debug_assert!(height >= self.filter_height);
528
529 if matched {
530 log::debug!("Filter matched for block #{}", height);
531 self.pending.insert(height);
532 }
533 self.filter_height = height;
534
535 emitter.emit(Event::FilterProcessed {
536 height,
537 matched,
538 valid,
539 block,
540 });
541 }
542}
543
544#[cfg(test)]
545mod test {
546 use std::io;
585
586 use nakamoto_common::bitcoin_hashes::Hash;
587 use quickcheck::TestResult;
588 use quickcheck_macros::quickcheck;
589
590 use nakamoto_common::block::time::Clock as _;
591 use nakamoto_common::network::Network;
592 use nakamoto_net::{Disconnect, Link, LocalTime, StateMachine as _};
593 use nakamoto_test::assert_matches;
594 use nakamoto_test::block::gen;
595
596 use super::Event;
597 use super::*;
598
599 use crate::handle::Handle as _;
600 use crate::tests::mock;
601 use crate::Command;
602
603 #[test]
604 fn test_ready_event() {
605 let network = Network::Regtest;
606 let mut client = mock::Client::new(network);
607 let handle = client.handle();
608 let events = handle.events();
609 let time = LocalTime::now();
610
611 client.protocol.initialize(time);
612 client.step();
613
614 assert_matches!(events.try_recv(), Ok(Event::Ready { .. }));
615 }
616
617 #[test]
618 fn test_peer_connected_disconnected() {
619 let network = Network::Regtest;
620 let mut client = mock::Client::new(network);
621 let handle = client.handle();
622 let remote = ([44, 44, 44, 44], 8333).into();
623 let local_addr = ([0, 0, 0, 0], 16333).into();
624 let events = handle.events();
625
626 client
627 .protocol
628 .connected(remote, &local_addr, Link::Inbound);
629 client.step();
630
631 assert_matches!(
632 events.try_recv(),
633 Ok(Event::PeerConnected { addr, link, .. })
634 if addr == remote && link == Link::Inbound
635 );
636
637 client.protocol.disconnected(
638 &remote,
639 Disconnect::ConnectionError(io::Error::from(io::ErrorKind::UnexpectedEof).into()),
640 );
641 client.step();
642
643 assert_matches!(
644 events.try_recv(),
645 Ok(Event::PeerDisconnected { addr, reason: Disconnect::ConnectionError(_) })
646 if addr == remote
647 );
648 }
649
650 #[test]
651 fn test_peer_connection_failed() {
652 let network = Network::Regtest;
653 let mut client = mock::Client::new(network);
654 let handle = client.handle();
655 let remote = ([44, 44, 44, 44], 8333).into();
656 let events = handle.events();
657
658 client.protocol.command(Command::Connect(remote));
659 client.protocol.attempted(&remote);
660 client.step();
661
662 assert_matches!(events.try_recv(), Err(_));
663
664 client.protocol.disconnected(
665 &remote,
666 Disconnect::ConnectionError(io::Error::from(io::ErrorKind::UnexpectedEof).into()),
667 );
668 client.step();
669
670 assert_matches!(
671 events.try_recv(),
672 Ok(Event::PeerConnectionFailed { addr, error })
673 if addr == remote && error.kind() == io::ErrorKind::UnexpectedEof
674 );
675 }
676
677 #[test]
678 fn test_peer_height_updated() {
679 use nakamoto_common::bitcoin::network::address::Address;
680 use nakamoto_common::bitcoin::network::constants::ServiceFlags;
681 use nakamoto_common::bitcoin::network::message::NetworkMessage;
682 use nakamoto_common::bitcoin::network::message_network::VersionMessage;
683
684 let network = Network::default();
685 let mut client = mock::Client::new(network);
686 let handle = client.handle();
687 let remote = ([44, 44, 44, 44], 8333).into();
688 let local_time = LocalTime::now();
689 let local_addr = ([0, 0, 0, 0], 16333).into();
690 let events = handle.events();
691
692 let version = |height: Height| -> NetworkMessage {
693 NetworkMessage::Version(VersionMessage {
694 version: fsm::MIN_PROTOCOL_VERSION,
695 services: ServiceFlags::NETWORK,
696 timestamp: local_time.block_time() as i64,
697 receiver: Address::new(&remote, ServiceFlags::NONE),
698 sender: Address::new(&local_addr, ServiceFlags::NONE),
699 nonce: 42,
700 user_agent: "?".to_owned(),
701 start_height: height as i32,
702 relay: false,
703 })
704 };
705
706 client
707 .protocol
708 .connected(remote, &local_addr, Link::Inbound);
709 client.received(&remote, version(42));
710 client.received(&remote, NetworkMessage::Verack);
711 client.step();
712
713 events
714 .try_iter()
715 .find(|e| matches!(e, Event::PeerHeightUpdated { height } if *height == 42))
716 .expect("We receive an event for the updated peer height");
717
718 let remote = ([45, 45, 45, 45], 8333).into();
719
720 client
721 .protocol
722 .connected(remote, &local_addr, Link::Inbound);
723 client.received(&remote, version(43));
724 client.received(&remote, NetworkMessage::Verack);
725 client.step();
726
727 events
728 .try_iter()
729 .find(|e| matches!(e, Event::PeerHeightUpdated { height } if *height == 43))
730 .expect("We receive an event for the updated peer height");
731 }
732
733 #[test]
734 fn test_peer_negotiated() {
735 use nakamoto_common::bitcoin::network::address::Address;
736 use nakamoto_common::bitcoin::network::constants::ServiceFlags;
737 use nakamoto_common::bitcoin::network::message::NetworkMessage;
738 use nakamoto_common::bitcoin::network::message_network::VersionMessage;
739
740 let network = Network::default();
741 let mut client = mock::Client::new(network);
742 let handle = client.handle();
743 let remote = ([44, 44, 44, 44], 8333).into();
744 let local_time = LocalTime::now();
745 let local_addr = ([0, 0, 0, 0], 16333).into();
746 let events = handle.events();
747
748 client
749 .protocol
750 .connected(remote, &local_addr, Link::Inbound);
751 client.step();
752
753 let version = NetworkMessage::Version(VersionMessage {
754 version: fsm::MIN_PROTOCOL_VERSION,
755 services: ServiceFlags::NETWORK,
756 timestamp: local_time.block_time() as i64,
757 receiver: Address::new(&remote, ServiceFlags::NONE),
758 sender: Address::new(&local_addr, ServiceFlags::NONE),
759 nonce: 42,
760 user_agent: "?".to_owned(),
761 start_height: 42,
762 relay: false,
763 });
764
765 client.received(&remote, version);
766 client.received(&remote, NetworkMessage::Verack);
767 client.step();
768
769 assert_matches!(events.try_recv(), Ok(Event::PeerConnected { .. }));
770 assert_matches!(
771 events.try_recv(),
772 Ok(Event::PeerNegotiated { addr, height, user_agent, .. })
773 if addr == remote && height == 42 && user_agent == "?"
774 );
775 }
776
777 #[quickcheck]
778 fn prop_client_side_filtering(birth: Height, height: Height, seed: u64) -> TestResult {
779 if height < 1 || height > 24 || birth >= height {
780 return TestResult::discard();
781 }
782
783 let mut rng = fastrand::Rng::with_seed(seed);
784 let network = Network::Regtest;
785 let genesis = network.genesis_block();
786 let chain = gen::blockchain(genesis, height, &mut rng);
787 let mut mock = mock::Client::new(network);
788 let mut client = mock.handle();
789
790 client.tip = (height, chain[height as usize].header);
791
792 let mut spent = 0;
793 let (watch, heights, balance) = gen::watchlist_rng(birth, chain.iter(), &mut rng);
794
795 log::debug!(
796 "-- Test case with birth = {} and height = {}",
797 birth,
798 height
799 );
800 let subscriber = client.events();
801
802 mock.subscriber
803 .broadcast(fsm::Event::Chain(fsm::ChainEvent::Synced(
804 chain.last().block_hash(),
805 height,
806 )));
807
808 for h in birth..=height {
809 let matched = heights.contains(&h);
810 let block = chain[h as usize].clone();
811
812 mock.subscriber
813 .broadcast(fsm::Event::Filter(fsm::FilterEvent::FilterProcessed {
814 block: block.block_hash(),
815 height: h,
816 matched,
817 cached: false,
818 valid: true,
819 }));
820
821 if matched {
822 mock.subscriber.broadcast(fsm::Event::Inventory(
823 fsm::InventoryEvent::BlockProcessed {
824 block,
825 height: h,
826 fees: None,
827 },
828 ));
829 }
830 }
831
832 for event in subscriber.try_iter() {
833 match event {
834 Event::BlockMatched { transactions, .. } => {
835 for t in &transactions {
836 for output in &t.output {
837 if watch.contains(&output.script_pubkey) {
838 spent += output.value;
839 }
840 }
841 }
842 }
843 Event::Synced {
844 height: sync_height,
845 tip,
846 } => {
847 assert_eq!(height, tip);
848
849 if sync_height == tip {
850 break;
851 }
852 }
853 _ => {}
854 }
855 }
856 assert_eq!(balance, spent);
857 client.shutdown().unwrap();
858
859 TestResult::passed()
860 }
861
862 #[test]
863 fn test_tx_status_ordering() {
864 assert!(
865 TxStatus::Unconfirmed
866 < TxStatus::Acknowledged {
867 peer: ([0, 0, 0, 0], 0).into()
868 }
869 );
870 assert!(
871 TxStatus::Acknowledged {
872 peer: ([0, 0, 0, 0], 0).into()
873 } < TxStatus::Confirmed {
874 height: 0,
875 block: BlockHash::all_zeros(),
876 }
877 );
878 assert!(
879 TxStatus::Confirmed {
880 height: 0,
881 block: BlockHash::all_zeros(),
882 } < TxStatus::Reverted
883 );
884 assert!(
885 TxStatus::Reverted
886 < TxStatus::Stale {
887 replaced_by: Txid::all_zeros(),
888 block: BlockHash::all_zeros()
889 }
890 );
891 }
892}