1#![warn(missing_docs)]
3use crossbeam_channel as chan;
4use log::*;
5
6pub mod event;
7pub mod fees;
8pub mod filter_cache;
9pub mod output;
10
11mod addrmgr;
13mod cbfmgr;
14mod invmgr;
15mod peermgr;
16mod pingmgr;
17mod syncmgr;
18
19#[cfg(test)]
20mod tests;
21
22use addrmgr::AddressManager;
23use cbfmgr::FilterManager;
24use invmgr::InventoryManager;
25use output::Outbox;
26use peermgr::PeerManager;
27use pingmgr::PingManager;
28use syncmgr::SyncManager;
29
30pub use addrmgr::Event as AddressEvent;
31pub use cbfmgr::Event as FilterEvent;
32pub use invmgr::Event as InventoryEvent;
33pub use peermgr::Event as PeerEvent;
34pub use pingmgr::Event as PingEvent;
35pub use syncmgr::Event as ChainEvent;
36
37use crate::stream;
38
39pub use event::Event;
40pub use nakamoto_net::Link;
41pub use output::Io;
42
43use std::borrow::Cow;
44use std::collections::{HashMap, HashSet};
45use std::fmt::{self, Debug};
46use std::net;
47use std::ops::{Bound, RangeInclusive};
48use std::sync::Arc;
49
50use nakamoto_common::bitcoin::blockdata::block::BlockHeader;
51use nakamoto_common::bitcoin::consensus::encode;
52use nakamoto_common::bitcoin::consensus::params::Params;
53use nakamoto_common::bitcoin::network::constants::ServiceFlags;
54use nakamoto_common::bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
55use nakamoto_common::bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory};
56use nakamoto_common::bitcoin::network::message_filter::GetCFilters;
57use nakamoto_common::bitcoin::network::message_network::VersionMessage;
58use nakamoto_common::bitcoin::network::Address;
59use nakamoto_common::bitcoin::Script;
60use nakamoto_common::block::filter::Filters;
61use nakamoto_common::block::time::AdjustedClock;
62use nakamoto_common::block::time::{LocalDuration, LocalTime};
63use nakamoto_common::block::tree::{self, BlockReader, BlockTree, ImportResult};
64use nakamoto_common::block::{BlockHash, Height};
65use nakamoto_common::block::{BlockTime, Transaction};
66use nakamoto_common::network;
67use nakamoto_common::nonempty::NonEmpty;
68use nakamoto_common::p2p::peer::AddressSource;
69use nakamoto_common::p2p::{peer, Domain};
70use nakamoto_net as traits;
71
72use thiserror::Error;
73
74pub const PROTOCOL_VERSION: u32 = 70016;
76pub const MIN_PROTOCOL_VERSION: u32 = 70012;
79pub const USER_AGENT: &str = "/nakamoto:0.3.0/";
81
82const INBOX_BUFFER_SIZE: usize = 1024 * 64;
84
85type Locators = (Vec<BlockHash>, BlockHash);
87
88pub type PeerId = net::SocketAddr;
90
91#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct Socket {
95 pub addr: net::SocketAddr,
97 refs: Arc<()>,
99}
100
101impl Socket {
102 pub fn new(addr: impl Into<net::SocketAddr>) -> Self {
104 Self {
105 addr: addr.into(),
106 refs: Arc::new(()),
107 }
108 }
109
110 pub fn refs(&self) -> usize {
112 Arc::strong_count(&self.refs)
113 }
114}
115
116impl From<net::SocketAddr> for Socket {
117 fn from(addr: net::SocketAddr) -> Self {
118 Self::new(addr)
119 }
120}
121
122#[derive(Debug, Clone)]
124pub enum DisconnectReason {
125 PeerMisbehaving(&'static str),
127 PeerProtocolVersion(u32),
129 PeerServices(ServiceFlags),
131 PeerHeight(Height),
133 PeerMagic(u32),
135 PeerTimeout(&'static str),
137 PeerDropped,
139 SelfConnection,
141 ConnectionLimit,
143 DecodeError(Arc<encode::Error>),
145 Command,
147 Other(&'static str),
149}
150
151impl DisconnectReason {
152 pub fn is_transient(&self) -> bool {
155 matches!(
156 self,
157 Self::ConnectionLimit | Self::PeerTimeout(_) | Self::PeerHeight(_)
158 )
159 }
160}
161
162impl From<DisconnectReason> for nakamoto_net::Disconnect<DisconnectReason> {
163 fn from(reason: DisconnectReason) -> Self {
164 Self::StateMachine(reason)
165 }
166}
167
168impl fmt::Display for DisconnectReason {
169 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170 match self {
171 Self::PeerMisbehaving(reason) => write!(f, "peer misbehaving: {}", reason),
172 Self::PeerProtocolVersion(_) => write!(f, "peer protocol version mismatch"),
173 Self::PeerServices(_) => write!(f, "peer doesn't have the required services"),
174 Self::PeerHeight(_) => write!(f, "peer is too far behind"),
175 Self::PeerMagic(magic) => write!(f, "received message with invalid magic: {}", magic),
176 Self::PeerTimeout(s) => write!(f, "peer timed out: {:?}", s),
177 Self::PeerDropped => write!(f, "peer dropped"),
178 Self::SelfConnection => write!(f, "detected self-connection"),
179 Self::ConnectionLimit => write!(f, "inbound connection limit reached"),
180 Self::DecodeError(err) => write!(f, "message decode error: {}", err),
181 Self::Command => write!(f, "received external command"),
182 Self::Other(reason) => write!(f, "{}", reason),
183 }
184 }
185}
186
187#[derive(Debug, Clone)]
189pub struct Peer {
190 pub addr: net::SocketAddr,
192 pub local_addr: net::SocketAddr,
194 pub link: Link,
196 pub since: LocalTime,
198 pub height: Height,
200 pub services: ServiceFlags,
202 pub user_agent: String,
204 pub relay: bool,
206}
207
208impl Peer {
209 pub fn is_outbound(&self) -> bool {
211 self.link.is_outbound()
212 }
213}
214
215impl From<(&peermgr::PeerInfo, &peermgr::Connection)> for Peer {
216 fn from((peer, conn): (&peermgr::PeerInfo, &peermgr::Connection)) -> Self {
217 Self {
218 addr: conn.socket.addr,
219 local_addr: conn.local_addr,
220 link: conn.link,
221 since: conn.since,
222 height: peer.height,
223 services: peer.services,
224 user_agent: peer.user_agent.clone(),
225 relay: peer.relay,
226 }
227 }
228}
229
230#[derive(Clone)]
232pub enum Command {
233 GetBlockByHeight(Height, chan::Sender<Option<BlockHeader>>),
235 GetPeers(ServiceFlags, chan::Sender<Vec<Peer>>),
237 GetTip(chan::Sender<(Height, BlockHeader)>),
239 GetBlock(BlockHash),
241 GetFilters(
243 RangeInclusive<Height>,
244 chan::Sender<Result<(), GetFiltersError>>,
245 ),
246 Rescan {
248 from: Bound<Height>,
250 to: Bound<Height>,
252 watch: Vec<Script>,
254 },
255 Watch {
257 watch: Vec<Script>,
259 },
260 Broadcast(NetworkMessage, fn(Peer) -> bool, chan::Sender<Vec<PeerId>>),
262 Query(NetworkMessage, chan::Sender<Option<net::SocketAddr>>),
264 QueryTree(Arc<dyn Fn(&dyn BlockReader) + Send + Sync>),
266 Connect(net::SocketAddr),
268 Disconnect(net::SocketAddr),
270 ImportHeaders(
272 Vec<BlockHeader>,
273 chan::Sender<Result<ImportResult, tree::Error>>,
274 ),
275 ImportAddresses(Vec<Address>),
277 SubmitTransaction(
279 Transaction,
280 chan::Sender<Result<NonEmpty<PeerId>, CommandError>>,
281 ),
282}
283
284impl fmt::Debug for Command {
285 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286 match self {
287 Self::GetBlockByHeight(height, _) => write!(f, "GetBlockByHeight({})", height),
288 Self::GetPeers(flags, _) => write!(f, "GetPeers({})", flags),
289 Self::GetTip(_) => write!(f, "GetTip"),
290 Self::GetBlock(hash) => write!(f, "GetBlock({})", hash),
291 Self::GetFilters(range, _) => write!(f, "GetFilters({:?})", range),
292 Self::Rescan { from, to, watch } => {
293 write!(f, "Rescan({:?}, {:?}, {:?})", from, to, watch)
294 }
295 Self::Watch { watch } => {
296 write!(f, "Watch({:?})", watch)
297 }
298 Self::Broadcast(msg, _, _) => write!(f, "Broadcast({})", msg.cmd()),
299 Self::Query(msg, _) => write!(f, "Query({})", msg.cmd()),
300 Self::QueryTree(_) => write!(f, "QueryTree"),
301 Self::Connect(addr) => write!(f, "Connect({})", addr),
302 Self::Disconnect(addr) => write!(f, "Disconnect({})", addr),
303 Self::ImportHeaders(_headers, _) => write!(f, "ImportHeaders(..)"),
304 Self::ImportAddresses(addrs) => write!(f, "ImportAddresses({:?})", addrs),
305 Self::SubmitTransaction(tx, _) => write!(f, "SubmitTransaction({:?})", tx),
306 }
307 }
308}
309
310#[derive(Error, Debug)]
312pub enum CommandError {
313 #[error("not connected to any peer with the required services")]
315 NotConnected,
316}
317
318pub use cbfmgr::GetFiltersError;
319
320#[derive(Clone)]
322pub struct Hooks {
323 pub on_message:
326 Arc<dyn Fn(PeerId, &NetworkMessage, &Outbox) -> Result<(), &'static str> + Send + Sync>,
327 pub on_version: Arc<dyn Fn(PeerId, VersionMessage) -> Result<(), &'static str> + Send + Sync>,
330 pub on_getcfilters: Arc<dyn Fn(PeerId, GetCFilters, &Outbox) + Send + Sync>,
332 pub on_getdata: Arc<dyn Fn(PeerId, Vec<Inventory>, &Outbox) + Send + Sync>,
334}
335
336impl Default for Hooks {
337 fn default() -> Self {
338 Self {
339 on_message: Arc::new(|_, _, _| Ok(())),
340 on_version: Arc::new(|_, _| Ok(())),
341 on_getcfilters: Arc::new(|_, _, _| {}),
342 on_getdata: Arc::new(|_, _, _| {}),
343 }
344 }
345}
346
347impl fmt::Debug for Hooks {
348 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349 f.debug_struct("Hooks").finish()
350 }
351}
352
353#[derive(Debug)]
358pub struct StateMachine<T, F, P, C> {
359 tree: T,
361 network: network::Network,
363 inbox: HashMap<PeerId, stream::Decoder>,
365 addrmgr: AddressManager<P, Outbox, C>,
367 syncmgr: SyncManager<Outbox, C>,
369 pingmgr: PingManager<Outbox, C>,
371 cbfmgr: FilterManager<F, Outbox, C>,
373 peermgr: PeerManager<Outbox, C>,
375 invmgr: InventoryManager<Outbox, C>,
377 clock: C,
379 #[allow(dead_code)]
381 last_tick: LocalTime,
382 rng: fastrand::Rng,
384 outbox: Outbox,
386 hooks: Hooks,
388}
389
390#[derive(Debug, Clone)]
392pub struct Limits {
393 pub max_outbound_peers: usize,
395 pub max_inbound_peers: usize,
397 pub filter_cache_size: usize,
399}
400
401impl Default for Limits {
402 fn default() -> Self {
403 Self {
404 max_outbound_peers: peermgr::TARGET_OUTBOUND_PEERS,
405 max_inbound_peers: peermgr::MAX_INBOUND_PEERS,
406 filter_cache_size: cbfmgr::DEFAULT_FILTER_CACHE_SIZE,
407 }
408 }
409}
410
411#[derive(Debug, Clone)]
413pub struct Config {
414 pub network: network::Network,
416 pub connect: Vec<net::SocketAddr>,
418 pub domains: Vec<Domain>,
420 pub services: ServiceFlags,
422 pub required_services: ServiceFlags,
424 pub whitelist: Whitelist,
426 pub params: Params,
428 pub protocol_version: u32,
430 pub user_agent: &'static str,
432 pub ping_timeout: LocalDuration,
434 pub hooks: Hooks,
436 pub limits: Limits,
438}
439
440impl Default for Config {
441 fn default() -> Self {
442 Self {
443 network: network::Network::default(),
444 params: Params::new(network::Network::default().into()),
445 connect: Vec::new(),
446 domains: Domain::all(),
447 services: ServiceFlags::NONE,
448 required_services: ServiceFlags::NETWORK,
449 whitelist: Whitelist::default(),
450 protocol_version: PROTOCOL_VERSION,
451 ping_timeout: pingmgr::PING_TIMEOUT,
452 user_agent: USER_AGENT,
453 hooks: Hooks::default(),
454 limits: Limits::default(),
455 }
456 }
457}
458
459impl Config {
460 pub fn from(network: network::Network, connect: Vec<net::SocketAddr>) -> Self {
462 let params = Params::new(network.into());
463
464 Self {
465 network,
466 connect,
467 params,
468 ..Self::default()
469 }
470 }
471
472 pub fn port(&self) -> u16 {
474 self.network.port()
475 }
476}
477
478#[derive(Debug, Clone, Default)]
480pub struct Whitelist {
481 addr: HashSet<net::IpAddr>,
483 user_agent: HashSet<String>,
485}
486
487impl Whitelist {
488 fn contains(&self, addr: &net::IpAddr, user_agent: &str) -> bool {
489 self.addr.contains(addr) || self.user_agent.contains(user_agent)
490 }
491}
492
493impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> StateMachine<T, F, P, C> {
494 pub fn new(
496 tree: T,
497 filters: F,
498 peers: P,
499 clock: C,
500 rng: fastrand::Rng,
501 config: Config,
502 ) -> Self {
503 let Config {
504 network,
505 connect,
506 domains,
507 services,
508 whitelist,
509 protocol_version,
510 ping_timeout,
511 user_agent,
512 required_services,
513 params,
514 hooks,
515 limits,
516 } = config;
517
518 let outbox = Outbox::new(network, protocol_version);
519 let inbox = HashMap::new();
520 let syncmgr = SyncManager::new(
521 syncmgr::Config {
522 max_message_headers: syncmgr::MAX_MESSAGE_HEADERS,
523 request_timeout: syncmgr::REQUEST_TIMEOUT,
524 params,
525 },
526 rng.clone(),
527 outbox.clone(),
528 clock.clone(),
529 );
530 let pingmgr = PingManager::new(ping_timeout, rng.clone(), outbox.clone(), clock.clone());
531 let cbfmgr = FilterManager::new(
532 cbfmgr::Config {
533 filter_cache_size: limits.filter_cache_size,
534 ..cbfmgr::Config::default()
535 },
536 rng.clone(),
537 filters,
538 outbox.clone(),
539 clock.clone(),
540 );
541 let peermgr = PeerManager::new(
542 peermgr::Config {
543 protocol_version: PROTOCOL_VERSION,
544 whitelist,
545 persistent: connect,
546 domains: domains.clone(),
547 target_outbound_peers: limits.max_outbound_peers,
548 max_inbound_peers: limits.max_inbound_peers,
549 retry_max_wait: LocalDuration::from_mins(60),
550 retry_min_wait: LocalDuration::from_secs(1),
551 required_services,
552 preferred_services: syncmgr::REQUIRED_SERVICES | cbfmgr::REQUIRED_SERVICES,
553 services,
554 user_agent,
555 },
556 rng.clone(),
557 hooks.clone(),
558 outbox.clone(),
559 clock.clone(),
560 );
561 let addrmgr = AddressManager::new(
562 addrmgr::Config {
563 required_services,
564 domains,
565 },
566 rng.clone(),
567 peers,
568 outbox.clone(),
569 clock.clone(),
570 );
571 let invmgr = InventoryManager::new(rng.clone(), outbox.clone(), clock.clone());
572
573 Self {
574 tree,
575 network,
576 clock,
577 inbox,
578 addrmgr,
579 syncmgr,
580 pingmgr,
581 cbfmgr,
582 peermgr,
583 invmgr,
584 last_tick: LocalTime::default(),
585 rng,
586 outbox,
587 hooks,
588 }
589 }
590
591 pub fn disconnect(&mut self, addr: PeerId, reason: DisconnectReason) {
593 self.peermgr.disconnect(addr, reason);
598 }
599
600 pub fn drain(&mut self) -> Box<dyn Iterator<Item = output::Io> + '_> {
602 Box::new(std::iter::from_fn(|| self.next()))
603 }
604
605 fn broadcast<Q>(&mut self, msg: NetworkMessage, predicate: Q) -> Vec<PeerId>
607 where
608 Q: Fn(&Peer) -> bool,
609 {
610 let mut peers = Vec::new();
611
612 for peer in self.peermgr.peers().map(Peer::from) {
613 if predicate(&peer) {
614 peers.push(peer.addr);
615 self.outbox.message(peer.addr, msg.clone());
616 }
617 }
618 peers
619 }
620
621 fn query<Q>(&mut self, msg: NetworkMessage, f: Q) -> Option<PeerId>
623 where
624 Q: Fn(&Peer) -> bool,
625 {
626 let peers = self
627 .peermgr
628 .negotiated(Link::Outbound)
629 .map(Peer::from)
630 .filter(f)
631 .collect::<Vec<_>>();
632
633 match peers.len() {
634 n if n > 0 => {
635 let r = self.rng.usize(..n);
636 let p = peers.get(r).unwrap();
637
638 self.outbox.message(p.addr, msg);
639
640 Some(p.addr)
641 }
642 _ => None,
643 }
644 }
645}
646
647impl<T, F, P, C> Iterator for StateMachine<T, F, P, C> {
648 type Item = output::Io;
649
650 fn next(&mut self) -> Option<output::Io> {
651 self.outbox.next()
652 }
653}
654
655impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> StateMachine<T, F, P, C> {
656 pub fn command(&mut self, cmd: Command) {
658 debug!(target: "p2p", "Received command: {:?}", cmd);
659
660 match cmd {
661 Command::QueryTree(query) => {
662 query(&self.tree);
663 }
664 Command::GetBlockByHeight(height, reply) => {
665 let header = self.tree.get_block_by_height(height).map(|h| h.to_owned());
666
667 reply.send(header).ok();
668 }
669 Command::GetPeers(services, reply) => {
670 let peers = self
671 .peermgr
672 .peers()
673 .filter(|(p, _)| p.is_negotiated())
674 .filter(|(p, _)| p.services.has(services))
675 .map(Peer::from)
676 .collect::<Vec<Peer>>();
677
678 reply.send(peers).ok();
679 }
680 Command::Connect(addr) => {
681 self.peermgr.whitelist(addr);
682 self.peermgr.connect(&addr);
683 }
684 Command::Disconnect(addr) => {
685 self.disconnect(addr, DisconnectReason::Command);
686 }
687 Command::Query(msg, reply) => {
688 reply.send(self.query(msg, |_| true)).ok();
689 }
690 Command::Broadcast(msg, predicate, reply) => {
691 let peers = self.broadcast(msg, |p| predicate(p.clone()));
692 reply.send(peers).ok();
693 }
694 Command::ImportHeaders(headers, reply) => {
695 let result = self
696 .syncmgr
697 .import_blocks(headers.into_iter(), &mut self.tree);
698
699 match result {
700 Ok(import_result) => {
701 reply.send(Ok(import_result)).ok();
702 }
703 Err(err) => {
704 reply.send(Err(err)).ok();
705 }
706 }
707 }
708 Command::ImportAddresses(addrs) => {
709 self.addrmgr.insert(
710 addrs.into_iter().map(|a| (BlockTime::default(), a)),
712 peer::Source::Imported,
713 );
714 }
715 Command::GetTip(reply) => {
716 let (_, header) = self.tree.tip();
717 let height = self.tree.height();
718
719 reply.send((height, header)).ok();
720 }
721 Command::GetFilters(range, reply) => {
722 let result = self.cbfmgr.get_cfilters(range, &self.tree);
723 reply.send(result).ok();
724 }
725 Command::GetBlock(hash) => {
726 self.invmgr.get_block(hash);
727 }
728 Command::SubmitTransaction(tx, reply) => {
729 self.cbfmgr.watch_transaction(&tx);
736
737 let peers = self.invmgr.announce(tx);
739
740 if let Some(peers) = NonEmpty::from_vec(peers) {
741 reply.send(Ok(peers)).ok();
742 } else {
743 reply.send(Err(CommandError::NotConnected)).ok();
744 }
745 }
746 Command::Rescan { from, to, watch } => {
747 for (_, hash) in self.cbfmgr.rescan(from, to, watch, &self.tree) {
749 self.invmgr.get_block(hash);
750 }
751 }
752 Command::Watch { watch } => {
753 self.cbfmgr.watch(watch);
754 }
755 }
756 }
757}
758
759impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> traits::StateMachine
760 for StateMachine<T, F, P, C>
761{
762 type Message = RawNetworkMessage;
763 type Event = Event;
764 type DisconnectReason = DisconnectReason;
765
766 fn initialize(&mut self, time: LocalTime) {
767 self.clock.set(time);
768 self.outbox.event(Event::Initializing);
769 self.addrmgr.initialize();
770 self.syncmgr.initialize(&self.tree);
771 self.peermgr.initialize(&mut self.addrmgr);
772 self.cbfmgr.initialize(&self.tree);
773 self.outbox.event(Event::Ready {
774 height: self.tree.height(),
775 filter_height: self.cbfmgr.filters.height(),
776 time,
777 });
778 }
779
780 fn message_received(&mut self, addr: &net::SocketAddr, msg: Cow<RawNetworkMessage>) {
781 let now = self.clock.local_time();
782 let cmd = msg.cmd();
783 let addr = *addr;
784 let msg = msg.into_owned();
785
786 if msg.magic != self.network.magic() {
787 return self.disconnect(addr, DisconnectReason::PeerMagic(msg.magic));
788 }
789
790 if !self.peermgr.is_connected(&addr) {
791 debug!(target: "p2p", "Received {:?} from unknown peer {}", cmd, addr);
792 return;
793 }
794
795 debug!(target: "p2p", "Received {:?} from {}", cmd, addr);
796
797 if let Err(err) = (self.hooks.on_message)(addr, &msg.payload, &self.outbox) {
798 debug!(
799 target: "p2p",
800 "Message {:?} from {} dropped by user hook: {}",
801 cmd, addr, err
802 );
803 return;
804 }
805
806 match msg.payload {
807 NetworkMessage::Version(msg) => {
808 let height = self.tree.height();
809
810 self.peermgr
811 .received_version(&addr, msg, height, &mut self.addrmgr);
812 }
813 NetworkMessage::Verack => {
814 if let Some((peer, conn)) = self.peermgr.received_verack(&addr, now) {
815 self.clock.record_offset(conn.socket.addr, peer.time_offset);
816 self.addrmgr
817 .peer_negotiated(&addr, peer.services, conn.link);
818 self.pingmgr.peer_negotiated(conn.socket.addr);
819 self.cbfmgr.peer_negotiated(
820 conn.socket.clone(),
821 peer.height,
822 peer.services,
823 conn.link,
824 peer.persistent,
825 &self.tree,
826 );
827 self.syncmgr.peer_negotiated(
828 conn.socket.clone(),
829 peer.height,
830 peer.services,
831 !peer.services.has(cbfmgr::REQUIRED_SERVICES),
832 conn.link,
833 &self.tree,
834 );
835 self.invmgr.peer_negotiated(
836 conn.socket,
837 peer.services,
838 peer.relay,
839 peer.wtxidrelay,
840 );
841 }
842 }
843 NetworkMessage::Ping(nonce) => {
844 if self.pingmgr.received_ping(addr, nonce) {
845 self.addrmgr.peer_active(addr);
846 }
847 }
848 NetworkMessage::Pong(nonce) => {
849 if self.pingmgr.received_pong(addr, nonce, now) {
850 self.addrmgr.peer_active(addr);
851 }
852 }
853 NetworkMessage::Headers(headers) => {
854 match self
855 .syncmgr
856 .received_headers(&addr, headers, &self.clock, &mut self.tree)
857 {
858 Err(e) => log::error!("Error receiving headers: {}", e),
859 Ok(ImportResult::TipChanged(_, _, _, reverted, _)) => {
860 if let Some((height, _)) = reverted.last() {
863 let fork_height = height - 1;
866 self.cbfmgr.rollback(fork_height).unwrap();
867
868 for (height, _) in reverted {
869 for tx in self.invmgr.block_reverted(height) {
870 self.cbfmgr.watch_transaction(&tx);
871 }
872 }
873 }
874 self.cbfmgr.sync(&self.tree);
880 }
881 _ => {}
882 }
883 }
884 NetworkMessage::GetHeaders(GetHeadersMessage {
885 locator_hashes,
886 stop_hash,
887 ..
888 }) => {
889 self.syncmgr
890 .received_getheaders(&addr, (locator_hashes, stop_hash), &self.tree);
891 }
892 NetworkMessage::Block(block) => {
893 for confirmed in self.invmgr.received_block(&addr, block, &self.tree) {
894 self.cbfmgr.unwatch_transaction(&confirmed);
895 }
896 }
897 NetworkMessage::Inv(inventory) => {
898 self.syncmgr.received_inv(addr, inventory, &self.tree);
899 }
901 NetworkMessage::CFHeaders(msg) => {
902 match self.cbfmgr.received_cfheaders(&addr, msg, &self.tree) {
903 Err(cbfmgr::Error::InvalidMessage { reason, .. }) => {
904 self.disconnect(addr, DisconnectReason::PeerMisbehaving(reason))
905 }
906 Err(err) => {
907 log::warn!(target: "p2p", "Error receiving filter headers: {}", err);
908 }
909 Ok(_) => {}
910 }
911 }
912 NetworkMessage::GetCFHeaders(msg) => {
913 match self.cbfmgr.received_getcfheaders(&addr, msg, &self.tree) {
914 Err(cbfmgr::Error::InvalidMessage { reason, .. }) => {
915 self.disconnect(addr, DisconnectReason::PeerMisbehaving(reason))
916 }
917 _ => {}
918 }
919 }
920 NetworkMessage::CFilter(msg) => {
921 match self.cbfmgr.received_cfilter(&addr, msg, &self.tree) {
922 Ok(matches) => {
923 for (_, hash) in matches {
924 self.invmgr.get_block(hash);
925 }
926 }
927 Err(cbfmgr::Error::InvalidMessage { reason, .. }) => {
928 self.disconnect(addr, DisconnectReason::PeerMisbehaving(reason))
929 }
930 Err(cbfmgr::Error::Ignored { .. } | cbfmgr::Error::Filters { .. }) => {}
931 }
932 }
933 NetworkMessage::GetCFilters(msg) => {
934 (*self.hooks.on_getcfilters)(addr, msg, &self.outbox);
935 }
936 NetworkMessage::Addr(addrs) => {
937 self.addrmgr.received_addr(addr, addrs);
938 }
940 NetworkMessage::GetAddr => {
941 self.addrmgr.received_getaddr(&addr);
942 }
943 NetworkMessage::GetData(invs) => {
944 self.invmgr.received_getdata(addr, &invs);
945 (*self.hooks.on_getdata)(addr, invs, &self.outbox);
946 }
947 NetworkMessage::WtxidRelay => {
948 self.peermgr.received_wtxidrelay(&addr);
949 }
950 NetworkMessage::SendHeaders => {
951 }
953 NetworkMessage::Unknown {
954 command: ref cmd, ..
955 } => {
956 warn!(target: "p2p", "Ignoring unknown message {:?} from {}", cmd, addr)
957 }
958 _ => {
959 warn!(target: "p2p", "Ignoring {:?} from {}", cmd, addr);
960 }
961 }
962 }
963
964 fn attempted(&mut self, addr: &net::SocketAddr) {
965 self.addrmgr.peer_attempted(addr);
966 self.peermgr.peer_attempted(addr);
967 }
968
969 fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) {
970 let height = self.tree.height();
971
972 self.addrmgr.record_local_address(*local_addr);
973 self.addrmgr.peer_connected(&addr);
974 self.peermgr.peer_connected(addr, *local_addr, link, height);
975 self.inbox
976 .insert(addr, stream::Decoder::new(INBOX_BUFFER_SIZE));
977 }
978
979 fn disconnected(
980 &mut self,
981 addr: &net::SocketAddr,
982 reason: nakamoto_net::Disconnect<DisconnectReason>,
983 ) {
984 self.cbfmgr.peer_disconnected(addr);
985 self.syncmgr.peer_disconnected(addr);
986 self.addrmgr.peer_disconnected(addr, reason.clone());
987 self.pingmgr.peer_disconnected(addr);
988 self.peermgr
989 .peer_disconnected(addr, &mut self.addrmgr, reason);
990 self.invmgr.peer_disconnected(addr);
991 }
992
993 fn tick(&mut self, local_time: LocalTime) {
994 trace!("Received tick");
995
996 self.clock.set(local_time);
997 }
998
999 fn timer_expired(&mut self) {
1000 trace!("Received wake");
1001
1002 self.invmgr.received_wake(&self.tree);
1003 self.syncmgr.received_wake(&self.tree);
1004 self.pingmgr.received_wake();
1005 self.addrmgr.received_wake();
1006 self.peermgr.received_wake(&mut self.addrmgr);
1007 self.cbfmgr.received_wake(&self.tree);
1008
1009 #[cfg(not(test))]
1010 let local_time = self.clock.local_time();
1011 #[cfg(not(test))]
1012 if local_time - self.last_tick >= LocalDuration::from_secs(10) {
1013 let (tip, _) = self.tree.tip();
1014 let height = self.tree.height();
1015 let best = self
1016 .syncmgr
1017 .best_height()
1018 .unwrap_or_else(|| self.tree.height());
1019 let sync = if best > 0 {
1020 height as f64 / best as f64 * 100.
1021 } else {
1022 0.
1023 };
1024 let outbound = self.peermgr.negotiated(Link::Outbound).count();
1025 let inbound = self.peermgr.negotiated(Link::Inbound).count();
1026 let connecting = self.peermgr.connecting().count();
1027 let target = self.peermgr.config.target_outbound_peers;
1028 let max_inbound = self.peermgr.config.max_inbound_peers;
1029 let addresses = self.addrmgr.len();
1030 let preferred = self
1031 .peermgr
1032 .negotiated(Link::Outbound)
1033 .filter(|(p, _)| p.services.has(self.peermgr.config.preferred_services))
1034 .count();
1035
1036 let mut msg = Vec::new();
1042
1043 msg.push(format!("tip = {}", tip));
1044 msg.push(format!("headers = {}/{} ({:.1}%)", height, best, sync));
1045 msg.push(format!(
1046 "cfheaders = {}/{}",
1047 self.cbfmgr.filters.height(),
1048 height
1049 ));
1050 msg.push(format!("inbound = {}/{}", inbound, max_inbound));
1051 msg.push(format!(
1052 "outbound = {}/{} ({})",
1053 outbound, target, preferred,
1054 ));
1055 msg.push(format!("connecting = {}/{}", connecting, target));
1056 msg.push(format!("addresses = {}", addresses));
1057
1058 log::info!(target: "p2p", "{}", msg.join(", "));
1059
1060 if self.cbfmgr.rescan.active {
1061 let rescan = &self.cbfmgr.rescan;
1062 log::info!(target: "p2p", "{}", rescan.info());
1063 }
1064 log::info!(
1065 target: "p2p",
1066 "inventory block queue = {}, requested = {}, mempool = {}",
1067 self.invmgr.received.len(),
1068 self.invmgr.remaining.len(),
1069 self.invmgr.mempool.len(),
1070 );
1071
1072 self.last_tick = local_time;
1073 }
1074 }
1075}