nakamoto_p2p/
fsm.rs

1//! Bitcoin protocol state machine.
2#![warn(missing_docs)]
3use crossbeam_channel as chan;
4use log::*;
5
6pub mod event;
7pub mod fees;
8pub mod filter_cache;
9pub mod output;
10
11// Sub-protocols.
12mod addrmgr;
13mod cbfmgr;
14mod invmgr;
15mod peermgr;
16mod pingmgr;
17mod syncmgr;
18
19#[cfg(test)]
20mod tests;
21
22use addrmgr::AddressManager;
23use cbfmgr::FilterManager;
24use invmgr::InventoryManager;
25use output::Outbox;
26use peermgr::PeerManager;
27use pingmgr::PingManager;
28use syncmgr::SyncManager;
29
30pub use addrmgr::Event as AddressEvent;
31pub use cbfmgr::Event as FilterEvent;
32pub use invmgr::Event as InventoryEvent;
33pub use peermgr::Event as PeerEvent;
34pub use pingmgr::Event as PingEvent;
35pub use syncmgr::Event as ChainEvent;
36
37use crate::stream;
38
39pub use event::Event;
40pub use nakamoto_net::Link;
41pub use output::Io;
42
43use std::borrow::Cow;
44use std::collections::{HashMap, HashSet};
45use std::fmt::{self, Debug};
46use std::net;
47use std::ops::{Bound, RangeInclusive};
48use std::sync::Arc;
49
50use nakamoto_common::bitcoin::blockdata::block::BlockHeader;
51use nakamoto_common::bitcoin::consensus::encode;
52use nakamoto_common::bitcoin::consensus::params::Params;
53use nakamoto_common::bitcoin::network::constants::ServiceFlags;
54use nakamoto_common::bitcoin::network::message::{NetworkMessage, RawNetworkMessage};
55use nakamoto_common::bitcoin::network::message_blockdata::{GetHeadersMessage, Inventory};
56use nakamoto_common::bitcoin::network::message_filter::GetCFilters;
57use nakamoto_common::bitcoin::network::message_network::VersionMessage;
58use nakamoto_common::bitcoin::network::Address;
59use nakamoto_common::bitcoin::Script;
60use nakamoto_common::block::filter::Filters;
61use nakamoto_common::block::time::AdjustedClock;
62use nakamoto_common::block::time::{LocalDuration, LocalTime};
63use nakamoto_common::block::tree::{self, BlockReader, BlockTree, ImportResult};
64use nakamoto_common::block::{BlockHash, Height};
65use nakamoto_common::block::{BlockTime, Transaction};
66use nakamoto_common::network;
67use nakamoto_common::nonempty::NonEmpty;
68use nakamoto_common::p2p::peer::AddressSource;
69use nakamoto_common::p2p::{peer, Domain};
70use nakamoto_net as traits;
71
72use thiserror::Error;
73
74/// Peer-to-peer protocol version.
75pub const PROTOCOL_VERSION: u32 = 70016;
76/// Minimum supported peer protocol version.
77/// This version includes support for the `sendheaders` feature.
78pub const MIN_PROTOCOL_VERSION: u32 = 70012;
79/// User agent included in `version` messages.
80pub const USER_AGENT: &str = "/nakamoto:0.3.0/";
81
82/// Starting size of peer inbox buffer.
83const INBOX_BUFFER_SIZE: usize = 1024 * 64;
84
85/// Block locators. Consists of starting hashes and a stop hash.
86type Locators = (Vec<BlockHash>, BlockHash);
87
88/// Identifies a peer.
89pub type PeerId = net::SocketAddr;
90
91/// Reference counting virtual socket.
92/// When there are no more references held, this peer can be dropped.
93#[derive(Clone, Debug, PartialEq, Eq)]
94pub struct Socket {
95    /// Socket address.
96    pub addr: net::SocketAddr,
97    /// Reference counter.
98    refs: Arc<()>,
99}
100
101impl Socket {
102    /// Create a new virtual socket.
103    pub fn new(addr: impl Into<net::SocketAddr>) -> Self {
104        Self {
105            addr: addr.into(),
106            refs: Arc::new(()),
107        }
108    }
109
110    /// Get the number of references to this virtual socket.
111    pub fn refs(&self) -> usize {
112        Arc::strong_count(&self.refs)
113    }
114}
115
116impl From<net::SocketAddr> for Socket {
117    fn from(addr: net::SocketAddr) -> Self {
118        Self::new(addr)
119    }
120}
121
122/// Disconnect reason.
123#[derive(Debug, Clone)]
124pub enum DisconnectReason {
125    /// Peer is misbehaving.
126    PeerMisbehaving(&'static str),
127    /// Peer protocol version is too old or too recent.
128    PeerProtocolVersion(u32),
129    /// Peer doesn't have the required services.
130    PeerServices(ServiceFlags),
131    /// Peer chain is too far behind.
132    PeerHeight(Height),
133    /// Peer magic is invalid.
134    PeerMagic(u32),
135    /// Peer timed out.
136    PeerTimeout(&'static str),
137    /// Peer was dropped by all sub-protocols.
138    PeerDropped,
139    /// Connection to self was detected.
140    SelfConnection,
141    /// Inbound connection limit reached.
142    ConnectionLimit,
143    /// Error trying to decode incoming message.
144    DecodeError(Arc<encode::Error>),
145    /// Peer was forced to disconnect by external command.
146    Command,
147    /// Peer was disconnected for another reason.
148    Other(&'static str),
149}
150
151impl DisconnectReason {
152    /// Check whether the disconnect reason is transient, ie. may no longer be applicable
153    /// after some time.
154    pub fn is_transient(&self) -> bool {
155        matches!(
156            self,
157            Self::ConnectionLimit | Self::PeerTimeout(_) | Self::PeerHeight(_)
158        )
159    }
160}
161
162impl From<DisconnectReason> for nakamoto_net::Disconnect<DisconnectReason> {
163    fn from(reason: DisconnectReason) -> Self {
164        Self::StateMachine(reason)
165    }
166}
167
168impl fmt::Display for DisconnectReason {
169    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
170        match self {
171            Self::PeerMisbehaving(reason) => write!(f, "peer misbehaving: {}", reason),
172            Self::PeerProtocolVersion(_) => write!(f, "peer protocol version mismatch"),
173            Self::PeerServices(_) => write!(f, "peer doesn't have the required services"),
174            Self::PeerHeight(_) => write!(f, "peer is too far behind"),
175            Self::PeerMagic(magic) => write!(f, "received message with invalid magic: {}", magic),
176            Self::PeerTimeout(s) => write!(f, "peer timed out: {:?}", s),
177            Self::PeerDropped => write!(f, "peer dropped"),
178            Self::SelfConnection => write!(f, "detected self-connection"),
179            Self::ConnectionLimit => write!(f, "inbound connection limit reached"),
180            Self::DecodeError(err) => write!(f, "message decode error: {}", err),
181            Self::Command => write!(f, "received external command"),
182            Self::Other(reason) => write!(f, "{}", reason),
183        }
184    }
185}
186
187/// A remote peer.
188#[derive(Debug, Clone)]
189pub struct Peer {
190    /// Peer address.
191    pub addr: net::SocketAddr,
192    /// Local peer address.
193    pub local_addr: net::SocketAddr,
194    /// Whether this is an inbound or outbound peer connection.
195    pub link: Link,
196    /// Connected since this time.
197    pub since: LocalTime,
198    /// The peer's best height.
199    pub height: Height,
200    /// The peer's services.
201    pub services: ServiceFlags,
202    /// Peer user agent string.
203    pub user_agent: String,
204    /// Whether this peer relays transactions.
205    pub relay: bool,
206}
207
208impl Peer {
209    /// Check if this is an outbound peer.
210    pub fn is_outbound(&self) -> bool {
211        self.link.is_outbound()
212    }
213}
214
215impl From<(&peermgr::PeerInfo, &peermgr::Connection)> for Peer {
216    fn from((peer, conn): (&peermgr::PeerInfo, &peermgr::Connection)) -> Self {
217        Self {
218            addr: conn.socket.addr,
219            local_addr: conn.local_addr,
220            link: conn.link,
221            since: conn.since,
222            height: peer.height,
223            services: peer.services,
224            user_agent: peer.user_agent.clone(),
225            relay: peer.relay,
226        }
227    }
228}
229
230/// A command or request that can be sent to the protocol.
231#[derive(Clone)]
232pub enum Command {
233    /// Get block header at height.
234    GetBlockByHeight(Height, chan::Sender<Option<BlockHeader>>),
235    /// Get connected peers.
236    GetPeers(ServiceFlags, chan::Sender<Vec<Peer>>),
237    /// Get the tip of the active chain.
238    GetTip(chan::Sender<(Height, BlockHeader)>),
239    /// Get a block from the active chain.
240    GetBlock(BlockHash),
241    /// Get block filters.
242    GetFilters(
243        RangeInclusive<Height>,
244        chan::Sender<Result<(), GetFiltersError>>,
245    ),
246    /// Rescan the chain for matching scripts and addresses.
247    Rescan {
248        /// Start scan from this height. If unbounded, start at the current height.
249        from: Bound<Height>,
250        /// Stop scanning at this height. If unbounded, don't stop scanning.
251        to: Bound<Height>,
252        /// Scripts to match on.
253        watch: Vec<Script>,
254    },
255    /// Update the watchlist with the provided scripts.
256    Watch {
257        /// Scripts to watch.
258        watch: Vec<Script>,
259    },
260    /// Broadcast to peers matching the predicate.
261    Broadcast(NetworkMessage, fn(Peer) -> bool, chan::Sender<Vec<PeerId>>),
262    /// Send a message to a random peer.
263    Query(NetworkMessage, chan::Sender<Option<net::SocketAddr>>),
264    /// Query the block tree.
265    QueryTree(Arc<dyn Fn(&dyn BlockReader) + Send + Sync>),
266    /// Connect to a peer.
267    Connect(net::SocketAddr),
268    /// Disconnect from a peer.
269    Disconnect(net::SocketAddr),
270    /// Import headers directly into the block store.
271    ImportHeaders(
272        Vec<BlockHeader>,
273        chan::Sender<Result<ImportResult, tree::Error>>,
274    ),
275    /// Import addresses into the address book.
276    ImportAddresses(Vec<Address>),
277    /// Submit a transaction to the network.
278    SubmitTransaction(
279        Transaction,
280        chan::Sender<Result<NonEmpty<PeerId>, CommandError>>,
281    ),
282}
283
284impl fmt::Debug for Command {
285    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
286        match self {
287            Self::GetBlockByHeight(height, _) => write!(f, "GetBlockByHeight({})", height),
288            Self::GetPeers(flags, _) => write!(f, "GetPeers({})", flags),
289            Self::GetTip(_) => write!(f, "GetTip"),
290            Self::GetBlock(hash) => write!(f, "GetBlock({})", hash),
291            Self::GetFilters(range, _) => write!(f, "GetFilters({:?})", range),
292            Self::Rescan { from, to, watch } => {
293                write!(f, "Rescan({:?}, {:?}, {:?})", from, to, watch)
294            }
295            Self::Watch { watch } => {
296                write!(f, "Watch({:?})", watch)
297            }
298            Self::Broadcast(msg, _, _) => write!(f, "Broadcast({})", msg.cmd()),
299            Self::Query(msg, _) => write!(f, "Query({})", msg.cmd()),
300            Self::QueryTree(_) => write!(f, "QueryTree"),
301            Self::Connect(addr) => write!(f, "Connect({})", addr),
302            Self::Disconnect(addr) => write!(f, "Disconnect({})", addr),
303            Self::ImportHeaders(_headers, _) => write!(f, "ImportHeaders(..)"),
304            Self::ImportAddresses(addrs) => write!(f, "ImportAddresses({:?})", addrs),
305            Self::SubmitTransaction(tx, _) => write!(f, "SubmitTransaction({:?})", tx),
306        }
307    }
308}
309
310/// A generic error resulting from processing a [`Command`].
311#[derive(Error, Debug)]
312pub enum CommandError {
313    /// Not connected to any peer with the required services.
314    #[error("not connected to any peer with the required services")]
315    NotConnected,
316}
317
318pub use cbfmgr::GetFiltersError;
319
320/// Holds functions that are used to hook into or alter protocol behavior.
321#[derive(Clone)]
322pub struct Hooks {
323    /// Called when we receive a message from a peer.
324    /// If an error is returned, the message is not further processed.
325    pub on_message:
326        Arc<dyn Fn(PeerId, &NetworkMessage, &Outbox) -> Result<(), &'static str> + Send + Sync>,
327    /// Called when a `version` message is received.
328    /// If an error is returned, the peer is dropped, and the error is logged.
329    pub on_version: Arc<dyn Fn(PeerId, VersionMessage) -> Result<(), &'static str> + Send + Sync>,
330    /// Called when a `getcfilters` message is received.
331    pub on_getcfilters: Arc<dyn Fn(PeerId, GetCFilters, &Outbox) + Send + Sync>,
332    /// Called when a `getdata` message is received.
333    pub on_getdata: Arc<dyn Fn(PeerId, Vec<Inventory>, &Outbox) + Send + Sync>,
334}
335
336impl Default for Hooks {
337    fn default() -> Self {
338        Self {
339            on_message: Arc::new(|_, _, _| Ok(())),
340            on_version: Arc::new(|_, _| Ok(())),
341            on_getcfilters: Arc::new(|_, _, _| {}),
342            on_getdata: Arc::new(|_, _, _| {}),
343        }
344    }
345}
346
347impl fmt::Debug for Hooks {
348    fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
349        f.debug_struct("Hooks").finish()
350    }
351}
352
353///////////////////////////////////////////////////////////////////////////////////////////////
354
355/// An instance of the Bitcoin P2P network protocol. Parametrized over the
356/// block-tree and compact filter store.
357#[derive(Debug)]
358pub struct StateMachine<T, F, P, C> {
359    /// Block tree.
360    tree: T,
361    /// Bitcoin network we're connecting to.
362    network: network::Network,
363    /// Peer message inboxes.
364    inbox: HashMap<PeerId, stream::Decoder>,
365    /// Peer address manager.
366    addrmgr: AddressManager<P, Outbox, C>,
367    /// Blockchain synchronization manager.
368    syncmgr: SyncManager<Outbox, C>,
369    /// Ping manager.
370    pingmgr: PingManager<Outbox, C>,
371    /// CBF (Compact Block Filter) manager.
372    cbfmgr: FilterManager<F, Outbox, C>,
373    /// Peer manager.
374    peermgr: PeerManager<Outbox, C>,
375    /// Inventory manager.
376    invmgr: InventoryManager<Outbox, C>,
377    /// Network-adjusted clock.
378    clock: C,
379    /// Last time a "tick" was triggered.
380    #[allow(dead_code)]
381    last_tick: LocalTime,
382    /// Random number generator.
383    rng: fastrand::Rng,
384    /// Outbound I/O. Used to communicate protocol events with a reactor.
385    outbox: Outbox,
386    /// State machine event hooks.
387    hooks: Hooks,
388}
389
390/// Configured limits.
391#[derive(Debug, Clone)]
392pub struct Limits {
393    /// Target outbound peer connections.
394    pub max_outbound_peers: usize,
395    /// Maximum inbound peer connections.
396    pub max_inbound_peers: usize,
397    /// Size in bytes of the compact filter cache.
398    pub filter_cache_size: usize,
399}
400
401impl Default for Limits {
402    fn default() -> Self {
403        Self {
404            max_outbound_peers: peermgr::TARGET_OUTBOUND_PEERS,
405            max_inbound_peers: peermgr::MAX_INBOUND_PEERS,
406            filter_cache_size: cbfmgr::DEFAULT_FILTER_CACHE_SIZE,
407        }
408    }
409}
410
411/// State machine configuration.
412#[derive(Debug, Clone)]
413pub struct Config {
414    /// Bitcoin network we are connected to.
415    pub network: network::Network,
416    /// Peers to connect to.
417    pub connect: Vec<net::SocketAddr>,
418    /// Supported communication domains.
419    pub domains: Vec<Domain>,
420    /// Services offered by our peer.
421    pub services: ServiceFlags,
422    /// Required peer services.
423    pub required_services: ServiceFlags,
424    /// Peer whitelist. Peers in this list are trusted by default.
425    pub whitelist: Whitelist,
426    /// Consensus parameters.
427    pub params: Params,
428    /// Our protocol version.
429    pub protocol_version: u32,
430    /// Our user agent.
431    pub user_agent: &'static str,
432    /// Ping timeout, after which remotes are disconnected.
433    pub ping_timeout: LocalDuration,
434    /// State machine event hooks.
435    pub hooks: Hooks,
436    /// Configured limits.
437    pub limits: Limits,
438}
439
440impl Default for Config {
441    fn default() -> Self {
442        Self {
443            network: network::Network::default(),
444            params: Params::new(network::Network::default().into()),
445            connect: Vec::new(),
446            domains: Domain::all(),
447            services: ServiceFlags::NONE,
448            required_services: ServiceFlags::NETWORK,
449            whitelist: Whitelist::default(),
450            protocol_version: PROTOCOL_VERSION,
451            ping_timeout: pingmgr::PING_TIMEOUT,
452            user_agent: USER_AGENT,
453            hooks: Hooks::default(),
454            limits: Limits::default(),
455        }
456    }
457}
458
459impl Config {
460    /// Construct a new configuration.
461    pub fn from(network: network::Network, connect: Vec<net::SocketAddr>) -> Self {
462        let params = Params::new(network.into());
463
464        Self {
465            network,
466            connect,
467            params,
468            ..Self::default()
469        }
470    }
471
472    /// Get the listen port.
473    pub fn port(&self) -> u16 {
474        self.network.port()
475    }
476}
477
478/// Peer whitelist.
479#[derive(Debug, Clone, Default)]
480pub struct Whitelist {
481    /// Trusted addresses.
482    addr: HashSet<net::IpAddr>,
483    /// Trusted user-agents.
484    user_agent: HashSet<String>,
485}
486
487impl Whitelist {
488    fn contains(&self, addr: &net::IpAddr, user_agent: &str) -> bool {
489        self.addr.contains(addr) || self.user_agent.contains(user_agent)
490    }
491}
492
493impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> StateMachine<T, F, P, C> {
494    /// Construct a new protocol instance.
495    pub fn new(
496        tree: T,
497        filters: F,
498        peers: P,
499        clock: C,
500        rng: fastrand::Rng,
501        config: Config,
502    ) -> Self {
503        let Config {
504            network,
505            connect,
506            domains,
507            services,
508            whitelist,
509            protocol_version,
510            ping_timeout,
511            user_agent,
512            required_services,
513            params,
514            hooks,
515            limits,
516        } = config;
517
518        let outbox = Outbox::new(network, protocol_version);
519        let inbox = HashMap::new();
520        let syncmgr = SyncManager::new(
521            syncmgr::Config {
522                max_message_headers: syncmgr::MAX_MESSAGE_HEADERS,
523                request_timeout: syncmgr::REQUEST_TIMEOUT,
524                params,
525            },
526            rng.clone(),
527            outbox.clone(),
528            clock.clone(),
529        );
530        let pingmgr = PingManager::new(ping_timeout, rng.clone(), outbox.clone(), clock.clone());
531        let cbfmgr = FilterManager::new(
532            cbfmgr::Config {
533                filter_cache_size: limits.filter_cache_size,
534                ..cbfmgr::Config::default()
535            },
536            rng.clone(),
537            filters,
538            outbox.clone(),
539            clock.clone(),
540        );
541        let peermgr = PeerManager::new(
542            peermgr::Config {
543                protocol_version: PROTOCOL_VERSION,
544                whitelist,
545                persistent: connect,
546                domains: domains.clone(),
547                target_outbound_peers: limits.max_outbound_peers,
548                max_inbound_peers: limits.max_inbound_peers,
549                retry_max_wait: LocalDuration::from_mins(60),
550                retry_min_wait: LocalDuration::from_secs(1),
551                required_services,
552                preferred_services: syncmgr::REQUIRED_SERVICES | cbfmgr::REQUIRED_SERVICES,
553                services,
554                user_agent,
555            },
556            rng.clone(),
557            hooks.clone(),
558            outbox.clone(),
559            clock.clone(),
560        );
561        let addrmgr = AddressManager::new(
562            addrmgr::Config {
563                required_services,
564                domains,
565            },
566            rng.clone(),
567            peers,
568            outbox.clone(),
569            clock.clone(),
570        );
571        let invmgr = InventoryManager::new(rng.clone(), outbox.clone(), clock.clone());
572
573        Self {
574            tree,
575            network,
576            clock,
577            inbox,
578            addrmgr,
579            syncmgr,
580            pingmgr,
581            cbfmgr,
582            peermgr,
583            invmgr,
584            last_tick: LocalTime::default(),
585            rng,
586            outbox,
587            hooks,
588        }
589    }
590
591    /// Disconnect a peer.
592    pub fn disconnect(&mut self, addr: PeerId, reason: DisconnectReason) {
593        // TODO: Trigger disconnection everywhere, as if peer disconnected. This
594        // avoids being in a state where we know a peer is about to get disconnected,
595        // but we still process messages from it as normal.
596
597        self.peermgr.disconnect(addr, reason);
598    }
599
600    /// Create a draining iterator over the protocol outputs.
601    pub fn drain(&mut self) -> Box<dyn Iterator<Item = output::Io> + '_> {
602        Box::new(std::iter::from_fn(|| self.next()))
603    }
604
605    /// Send a message to a all peers matching the predicate.
606    fn broadcast<Q>(&mut self, msg: NetworkMessage, predicate: Q) -> Vec<PeerId>
607    where
608        Q: Fn(&Peer) -> bool,
609    {
610        let mut peers = Vec::new();
611
612        for peer in self.peermgr.peers().map(Peer::from) {
613            if predicate(&peer) {
614                peers.push(peer.addr);
615                self.outbox.message(peer.addr, msg.clone());
616            }
617        }
618        peers
619    }
620
621    /// Send a message to a random outbound peer. Returns the peer id.
622    fn query<Q>(&mut self, msg: NetworkMessage, f: Q) -> Option<PeerId>
623    where
624        Q: Fn(&Peer) -> bool,
625    {
626        let peers = self
627            .peermgr
628            .negotiated(Link::Outbound)
629            .map(Peer::from)
630            .filter(f)
631            .collect::<Vec<_>>();
632
633        match peers.len() {
634            n if n > 0 => {
635                let r = self.rng.usize(..n);
636                let p = peers.get(r).unwrap();
637
638                self.outbox.message(p.addr, msg);
639
640                Some(p.addr)
641            }
642            _ => None,
643        }
644    }
645}
646
647impl<T, F, P, C> Iterator for StateMachine<T, F, P, C> {
648    type Item = output::Io;
649
650    fn next(&mut self) -> Option<output::Io> {
651        self.outbox.next()
652    }
653}
654
655impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> StateMachine<T, F, P, C> {
656    /// Process a user command.
657    pub fn command(&mut self, cmd: Command) {
658        debug!(target: "p2p", "Received command: {:?}", cmd);
659
660        match cmd {
661            Command::QueryTree(query) => {
662                query(&self.tree);
663            }
664            Command::GetBlockByHeight(height, reply) => {
665                let header = self.tree.get_block_by_height(height).map(|h| h.to_owned());
666
667                reply.send(header).ok();
668            }
669            Command::GetPeers(services, reply) => {
670                let peers = self
671                    .peermgr
672                    .peers()
673                    .filter(|(p, _)| p.is_negotiated())
674                    .filter(|(p, _)| p.services.has(services))
675                    .map(Peer::from)
676                    .collect::<Vec<Peer>>();
677
678                reply.send(peers).ok();
679            }
680            Command::Connect(addr) => {
681                self.peermgr.whitelist(addr);
682                self.peermgr.connect(&addr);
683            }
684            Command::Disconnect(addr) => {
685                self.disconnect(addr, DisconnectReason::Command);
686            }
687            Command::Query(msg, reply) => {
688                reply.send(self.query(msg, |_| true)).ok();
689            }
690            Command::Broadcast(msg, predicate, reply) => {
691                let peers = self.broadcast(msg, |p| predicate(p.clone()));
692                reply.send(peers).ok();
693            }
694            Command::ImportHeaders(headers, reply) => {
695                let result = self
696                    .syncmgr
697                    .import_blocks(headers.into_iter(), &mut self.tree);
698
699                match result {
700                    Ok(import_result) => {
701                        reply.send(Ok(import_result)).ok();
702                    }
703                    Err(err) => {
704                        reply.send(Err(err)).ok();
705                    }
706                }
707            }
708            Command::ImportAddresses(addrs) => {
709                self.addrmgr.insert(
710                    // Nb. For imported addresses, the time last active is not relevant.
711                    addrs.into_iter().map(|a| (BlockTime::default(), a)),
712                    peer::Source::Imported,
713                );
714            }
715            Command::GetTip(reply) => {
716                let (_, header) = self.tree.tip();
717                let height = self.tree.height();
718
719                reply.send((height, header)).ok();
720            }
721            Command::GetFilters(range, reply) => {
722                let result = self.cbfmgr.get_cfilters(range, &self.tree);
723                reply.send(result).ok();
724            }
725            Command::GetBlock(hash) => {
726                self.invmgr.get_block(hash);
727            }
728            Command::SubmitTransaction(tx, reply) => {
729                // Update local watchlist to track submitted transactions.
730                //
731                // Nb. This is currently non-optimal, as the cfilter matching is based on the
732                // output scripts. This may trigger false-positives, since the same
733                // invoice (address) can be re-used by multiple transactions, ie. outputs
734                // can figure in more than one block.
735                self.cbfmgr.watch_transaction(&tx);
736
737                // TODO: For BIP 339 support, we can send a `WTx` inventory here.
738                let peers = self.invmgr.announce(tx);
739
740                if let Some(peers) = NonEmpty::from_vec(peers) {
741                    reply.send(Ok(peers)).ok();
742                } else {
743                    reply.send(Err(CommandError::NotConnected)).ok();
744                }
745            }
746            Command::Rescan { from, to, watch } => {
747                // A rescan with a new watch list may return matches on cached filters.
748                for (_, hash) in self.cbfmgr.rescan(from, to, watch, &self.tree) {
749                    self.invmgr.get_block(hash);
750                }
751            }
752            Command::Watch { watch } => {
753                self.cbfmgr.watch(watch);
754            }
755        }
756    }
757}
758
759impl<T: BlockTree, F: Filters, P: peer::Store, C: AdjustedClock<PeerId>> traits::StateMachine
760    for StateMachine<T, F, P, C>
761{
762    type Message = RawNetworkMessage;
763    type Event = Event;
764    type DisconnectReason = DisconnectReason;
765
766    fn initialize(&mut self, time: LocalTime) {
767        self.clock.set(time);
768        self.outbox.event(Event::Initializing);
769        self.addrmgr.initialize();
770        self.syncmgr.initialize(&self.tree);
771        self.peermgr.initialize(&mut self.addrmgr);
772        self.cbfmgr.initialize(&self.tree);
773        self.outbox.event(Event::Ready {
774            height: self.tree.height(),
775            filter_height: self.cbfmgr.filters.height(),
776            time,
777        });
778    }
779
780    fn message_received(&mut self, addr: &net::SocketAddr, msg: Cow<RawNetworkMessage>) {
781        let now = self.clock.local_time();
782        let cmd = msg.cmd();
783        let addr = *addr;
784        let msg = msg.into_owned();
785
786        if msg.magic != self.network.magic() {
787            return self.disconnect(addr, DisconnectReason::PeerMagic(msg.magic));
788        }
789
790        if !self.peermgr.is_connected(&addr) {
791            debug!(target: "p2p", "Received {:?} from unknown peer {}", cmd, addr);
792            return;
793        }
794
795        debug!(target: "p2p", "Received {:?} from {}", cmd, addr);
796
797        if let Err(err) = (self.hooks.on_message)(addr, &msg.payload, &self.outbox) {
798            debug!(
799                target: "p2p",
800                "Message {:?} from {} dropped by user hook: {}",
801                cmd, addr, err
802            );
803            return;
804        }
805
806        match msg.payload {
807            NetworkMessage::Version(msg) => {
808                let height = self.tree.height();
809
810                self.peermgr
811                    .received_version(&addr, msg, height, &mut self.addrmgr);
812            }
813            NetworkMessage::Verack => {
814                if let Some((peer, conn)) = self.peermgr.received_verack(&addr, now) {
815                    self.clock.record_offset(conn.socket.addr, peer.time_offset);
816                    self.addrmgr
817                        .peer_negotiated(&addr, peer.services, conn.link);
818                    self.pingmgr.peer_negotiated(conn.socket.addr);
819                    self.cbfmgr.peer_negotiated(
820                        conn.socket.clone(),
821                        peer.height,
822                        peer.services,
823                        conn.link,
824                        peer.persistent,
825                        &self.tree,
826                    );
827                    self.syncmgr.peer_negotiated(
828                        conn.socket.clone(),
829                        peer.height,
830                        peer.services,
831                        !peer.services.has(cbfmgr::REQUIRED_SERVICES),
832                        conn.link,
833                        &self.tree,
834                    );
835                    self.invmgr.peer_negotiated(
836                        conn.socket,
837                        peer.services,
838                        peer.relay,
839                        peer.wtxidrelay,
840                    );
841                }
842            }
843            NetworkMessage::Ping(nonce) => {
844                if self.pingmgr.received_ping(addr, nonce) {
845                    self.addrmgr.peer_active(addr);
846                }
847            }
848            NetworkMessage::Pong(nonce) => {
849                if self.pingmgr.received_pong(addr, nonce, now) {
850                    self.addrmgr.peer_active(addr);
851                }
852            }
853            NetworkMessage::Headers(headers) => {
854                match self
855                    .syncmgr
856                    .received_headers(&addr, headers, &self.clock, &mut self.tree)
857                {
858                    Err(e) => log::error!("Error receiving headers: {}", e),
859                    Ok(ImportResult::TipChanged(_, _, _, reverted, _)) => {
860                        // Nb. the reverted blocks are ordered from the tip down to
861                        // the oldest ancestor.
862                        if let Some((height, _)) = reverted.last() {
863                            // The height we need to rollback to, ie. the tip of our new chain
864                            // and the tallest block we are keeping.
865                            let fork_height = height - 1;
866                            self.cbfmgr.rollback(fork_height).unwrap();
867
868                            for (height, _) in reverted {
869                                for tx in self.invmgr.block_reverted(height) {
870                                    self.cbfmgr.watch_transaction(&tx);
871                                }
872                            }
873                        }
874                        // Trigger a filter sync, since we're going to have to catch up on the
875                        // new block header(s). This is not required, but reduces latency.
876                        //
877                        // In the case of a re-org, this will trigger a re-download of the
878                        // missing headers after the rollback.
879                        self.cbfmgr.sync(&self.tree);
880                    }
881                    _ => {}
882                }
883            }
884            NetworkMessage::GetHeaders(GetHeadersMessage {
885                locator_hashes,
886                stop_hash,
887                ..
888            }) => {
889                self.syncmgr
890                    .received_getheaders(&addr, (locator_hashes, stop_hash), &self.tree);
891            }
892            NetworkMessage::Block(block) => {
893                for confirmed in self.invmgr.received_block(&addr, block, &self.tree) {
894                    self.cbfmgr.unwatch_transaction(&confirmed);
895                }
896            }
897            NetworkMessage::Inv(inventory) => {
898                self.syncmgr.received_inv(addr, inventory, &self.tree);
899                // TODO: invmgr: Update block availability for this peer.
900            }
901            NetworkMessage::CFHeaders(msg) => {
902                match self.cbfmgr.received_cfheaders(&addr, msg, &self.tree) {
903                    Err(cbfmgr::Error::InvalidMessage { reason, .. }) => {
904                        self.disconnect(addr, DisconnectReason::PeerMisbehaving(reason))
905                    }
906                    Err(err) => {
907                        log::warn!(target: "p2p", "Error receiving filter headers: {}", err);
908                    }
909                    Ok(_) => {}
910                }
911            }
912            NetworkMessage::GetCFHeaders(msg) => {
913                match self.cbfmgr.received_getcfheaders(&addr, msg, &self.tree) {
914                    Err(cbfmgr::Error::InvalidMessage { reason, .. }) => {
915                        self.disconnect(addr, DisconnectReason::PeerMisbehaving(reason))
916                    }
917                    _ => {}
918                }
919            }
920            NetworkMessage::CFilter(msg) => {
921                match self.cbfmgr.received_cfilter(&addr, msg, &self.tree) {
922                    Ok(matches) => {
923                        for (_, hash) in matches {
924                            self.invmgr.get_block(hash);
925                        }
926                    }
927                    Err(cbfmgr::Error::InvalidMessage { reason, .. }) => {
928                        self.disconnect(addr, DisconnectReason::PeerMisbehaving(reason))
929                    }
930                    Err(cbfmgr::Error::Ignored { .. } | cbfmgr::Error::Filters { .. }) => {}
931                }
932            }
933            NetworkMessage::GetCFilters(msg) => {
934                (*self.hooks.on_getcfilters)(addr, msg, &self.outbox);
935            }
936            NetworkMessage::Addr(addrs) => {
937                self.addrmgr.received_addr(addr, addrs);
938                // TODO: Tick the peer manager, because we may have new addresses to connect to.
939            }
940            NetworkMessage::GetAddr => {
941                self.addrmgr.received_getaddr(&addr);
942            }
943            NetworkMessage::GetData(invs) => {
944                self.invmgr.received_getdata(addr, &invs);
945                (*self.hooks.on_getdata)(addr, invs, &self.outbox);
946            }
947            NetworkMessage::WtxidRelay => {
948                self.peermgr.received_wtxidrelay(&addr);
949            }
950            NetworkMessage::SendHeaders => {
951                // We adhere to `sendheaders` by default.
952            }
953            NetworkMessage::Unknown {
954                command: ref cmd, ..
955            } => {
956                warn!(target: "p2p", "Ignoring unknown message {:?} from {}", cmd, addr)
957            }
958            _ => {
959                warn!(target: "p2p", "Ignoring {:?} from {}", cmd, addr);
960            }
961        }
962    }
963
964    fn attempted(&mut self, addr: &net::SocketAddr) {
965        self.addrmgr.peer_attempted(addr);
966        self.peermgr.peer_attempted(addr);
967    }
968
969    fn connected(&mut self, addr: net::SocketAddr, local_addr: &net::SocketAddr, link: Link) {
970        let height = self.tree.height();
971
972        self.addrmgr.record_local_address(*local_addr);
973        self.addrmgr.peer_connected(&addr);
974        self.peermgr.peer_connected(addr, *local_addr, link, height);
975        self.inbox
976            .insert(addr, stream::Decoder::new(INBOX_BUFFER_SIZE));
977    }
978
979    fn disconnected(
980        &mut self,
981        addr: &net::SocketAddr,
982        reason: nakamoto_net::Disconnect<DisconnectReason>,
983    ) {
984        self.cbfmgr.peer_disconnected(addr);
985        self.syncmgr.peer_disconnected(addr);
986        self.addrmgr.peer_disconnected(addr, reason.clone());
987        self.pingmgr.peer_disconnected(addr);
988        self.peermgr
989            .peer_disconnected(addr, &mut self.addrmgr, reason);
990        self.invmgr.peer_disconnected(addr);
991    }
992
993    fn tick(&mut self, local_time: LocalTime) {
994        trace!("Received tick");
995
996        self.clock.set(local_time);
997    }
998
999    fn timer_expired(&mut self) {
1000        trace!("Received wake");
1001
1002        self.invmgr.received_wake(&self.tree);
1003        self.syncmgr.received_wake(&self.tree);
1004        self.pingmgr.received_wake();
1005        self.addrmgr.received_wake();
1006        self.peermgr.received_wake(&mut self.addrmgr);
1007        self.cbfmgr.received_wake(&self.tree);
1008
1009        #[cfg(not(test))]
1010        let local_time = self.clock.local_time();
1011        #[cfg(not(test))]
1012        if local_time - self.last_tick >= LocalDuration::from_secs(10) {
1013            let (tip, _) = self.tree.tip();
1014            let height = self.tree.height();
1015            let best = self
1016                .syncmgr
1017                .best_height()
1018                .unwrap_or_else(|| self.tree.height());
1019            let sync = if best > 0 {
1020                height as f64 / best as f64 * 100.
1021            } else {
1022                0.
1023            };
1024            let outbound = self.peermgr.negotiated(Link::Outbound).count();
1025            let inbound = self.peermgr.negotiated(Link::Inbound).count();
1026            let connecting = self.peermgr.connecting().count();
1027            let target = self.peermgr.config.target_outbound_peers;
1028            let max_inbound = self.peermgr.config.max_inbound_peers;
1029            let addresses = self.addrmgr.len();
1030            let preferred = self
1031                .peermgr
1032                .negotiated(Link::Outbound)
1033                .filter(|(p, _)| p.services.has(self.peermgr.config.preferred_services))
1034                .count();
1035
1036            // TODO: Add cache sizes on disk
1037            // TODO: Add protocol state(s)
1038            // TODO: Trim block hash
1039            // TODO: Add average headers/s or bandwidth
1040
1041            let mut msg = Vec::new();
1042
1043            msg.push(format!("tip = {}", tip));
1044            msg.push(format!("headers = {}/{} ({:.1}%)", height, best, sync));
1045            msg.push(format!(
1046                "cfheaders = {}/{}",
1047                self.cbfmgr.filters.height(),
1048                height
1049            ));
1050            msg.push(format!("inbound = {}/{}", inbound, max_inbound));
1051            msg.push(format!(
1052                "outbound = {}/{} ({})",
1053                outbound, target, preferred,
1054            ));
1055            msg.push(format!("connecting = {}/{}", connecting, target));
1056            msg.push(format!("addresses = {}", addresses));
1057
1058            log::info!(target: "p2p", "{}", msg.join(", "));
1059
1060            if self.cbfmgr.rescan.active {
1061                let rescan = &self.cbfmgr.rescan;
1062                log::info!(target: "p2p", "{}", rescan.info());
1063            }
1064            log::info!(
1065                target: "p2p",
1066                "inventory block queue = {}, requested = {}, mempool = {}",
1067                self.invmgr.received.len(),
1068                self.invmgr.remaining.len(),
1069                self.invmgr.mempool.len(),
1070            );
1071
1072            self.last_tick = local_time;
1073        }
1074    }
1075}