nakamoto_client/
client.rs

1//! Core nakamoto client functionality. Wraps all the other modules under a unified
2//! interface.
3use std::collections::HashMap;
4use std::env;
5use std::fs;
6use std::io;
7use std::net;
8use std::ops::ControlFlow;
9use std::ops::RangeInclusive;
10use std::path::PathBuf;
11use std::time::{self, SystemTime};
12
13pub use crossbeam_channel as chan;
14
15use nakamoto_chain::block::{store, Block};
16use nakamoto_chain::filter;
17use nakamoto_chain::filter::cache::FilterCache;
18use nakamoto_chain::filter::cache::StoredHeader;
19use nakamoto_chain::{block::cache::BlockCache, filter::BlockFilter};
20
21use nakamoto_common::bitcoin::network::constants::ServiceFlags;
22use nakamoto_common::bitcoin::network::message::NetworkMessage;
23use nakamoto_common::bitcoin::network::Address;
24use nakamoto_common::block::store::{Genesis as _, Store as _};
25use nakamoto_common::block::time::{AdjustedTime, RefClock};
26use nakamoto_common::block::tree::{self, BlockReader, ImportResult};
27use nakamoto_common::block::{BlockHash, BlockHeader, Height, Transaction};
28use nakamoto_common::nonempty::NonEmpty;
29use nakamoto_common::p2p::peer::{Source, Store as _};
30use nakamoto_p2p::fsm;
31
32pub use nakamoto_common::network;
33pub use nakamoto_common::network::Network;
34pub use nakamoto_common::p2p::Domain;
35pub use nakamoto_net::event;
36pub use nakamoto_p2p::fsm::{Command, CommandError, Hooks, Limits, Link, Peer};
37
38pub use crate::error::Error;
39pub use crate::event::{Event, Loading};
40pub use crate::handle;
41pub use crate::service::Service;
42
43use crate::event::Mapper;
44use crate::peer;
45use nakamoto_net::{Reactor, Waker};
46
47/// Client configuration.
48#[derive(Debug, Clone)]
49pub struct Config {
50    /// Bitcoin network.
51    pub network: Network,
52    /// Connect via these network domains, eg. IPv4, IPv6.
53    pub domains: Vec<Domain>,
54    /// Peers to connect to instead of using the peer discovery mechanism.
55    pub connect: Vec<net::SocketAddr>,
56    /// Client listen addresses.
57    pub listen: Vec<net::SocketAddr>,
58    /// Client home path, where runtime data is stored, eg. block headers and filters.
59    pub root: PathBuf,
60    /// Verify on-disk data at load time.
61    /// This can be set to `true` for additional checks, if for example data integrity
62    /// of the file system is not guaranteed, or the file system is untrusted.
63    pub verify: bool,
64    /// User agent string.
65    pub user_agent: &'static str,
66    /// Client hooks.
67    pub hooks: Hooks,
68    /// Services offered by this node.
69    pub services: ServiceFlags,
70    /// Configured limits.
71    pub limits: Limits,
72}
73
74/// Configuration for loading event handling.
75#[derive(Default)]
76pub enum LoadingHandler {
77    /// Ignore events.
78    #[default]
79    Ignore,
80    /// Send events to given channel.
81    Channel(chan::Sender<Loading>),
82}
83
84impl From<chan::Sender<Loading>> for LoadingHandler {
85    fn from(c: chan::Sender<Loading>) -> Self {
86        Self::Channel(c)
87    }
88}
89
90impl LoadingHandler {
91    fn send(&self, event: Loading) -> ControlFlow<()> {
92        match self {
93            Self::Ignore => ControlFlow::Continue(()),
94            Self::Channel(channel) => {
95                if channel.send(event).is_ok() {
96                    ControlFlow::Continue(())
97                } else {
98                    ControlFlow::Break(())
99                }
100            }
101        }
102    }
103}
104
105impl Config {
106    /// Create a new configuration for the given network.
107    pub fn new(network: Network) -> Self {
108        Self {
109            network,
110            ..Self::default()
111        }
112    }
113}
114
115impl Default for Config {
116    fn default() -> Self {
117        Self {
118            network: Network::default(),
119            connect: Vec::new(),
120            domains: Domain::all(),
121            listen: vec![([0, 0, 0, 0], 0).into()],
122            root: PathBuf::from(env::var("HOME").unwrap_or_default()),
123            verify: false,
124            user_agent: fsm::USER_AGENT,
125            hooks: Hooks::default(),
126            limits: Limits::default(),
127            services: ServiceFlags::NONE,
128        }
129    }
130}
131
132/// The client's event publisher.
133struct Publisher<E> {
134    publishers: Vec<Box<dyn nakamoto_net::Publisher<E>>>,
135}
136
137impl<E> Publisher<E> {
138    /// Register a publisher.
139    pub fn register(mut self, publisher: impl nakamoto_net::Publisher<E> + 'static) -> Self {
140        self.publishers.push(Box::new(publisher));
141        self
142    }
143}
144
145impl<E> Default for Publisher<E> {
146    fn default() -> Self {
147        Self {
148            publishers: Vec::new(),
149        }
150    }
151}
152
153impl<E> nakamoto_net::Publisher<E> for Publisher<E>
154where
155    E: Clone,
156{
157    fn publish(&mut self, e: E) {
158        for p in self.publishers.iter_mut() {
159            p.publish(e.clone());
160        }
161    }
162}
163
164/// Runs a pre-loaded client.
165pub struct ClientRunner<R> {
166    service: Service<
167        BlockCache<store::File<BlockHeader>>,
168        FilterCache<store::File<StoredHeader>>,
169        peer::Cache,
170        RefClock<AdjustedTime<net::SocketAddr>>,
171    >,
172    listen: Vec<net::SocketAddr>,
173    commands: chan::Receiver<Command>,
174    publisher: Publisher<fsm::Event>,
175    reactor: R,
176}
177
178impl<R: Reactor> ClientRunner<R> {
179    /// Run a pre-loaded client.
180    pub fn run(mut self) -> Result<(), Error> {
181        self.reactor
182            .run(&self.listen, self.service, self.publisher, self.commands)?;
183
184        Ok(())
185    }
186}
187
188/// A light-client process.
189pub struct Client<R: Reactor> {
190    handle: Handle<R::Waker>,
191    commands: chan::Receiver<Command>,
192    publisher: Publisher<fsm::Event>,
193    reactor: R,
194}
195
196impl<R: Reactor> Client<R> {
197    /// Create a new client.
198    pub fn new() -> Result<Self, Error> {
199        let (commands_tx, commands_rx) = chan::unbounded::<Command>();
200        let (event_pub, events) = event::broadcast(|e, p| p.emit(e));
201        let (blocks_pub, blocks) = event::broadcast(|e, p| {
202            if let fsm::Event::Inventory(fsm::InventoryEvent::BlockProcessed {
203                block,
204                height,
205                ..
206            }) = e
207            {
208                p.emit((block, height));
209            }
210        });
211        let (filters_pub, filters) = event::broadcast(|e, p| {
212            if let fsm::Event::Filter(fsm::FilterEvent::FilterReceived {
213                filter,
214                block_hash,
215                height,
216                ..
217            }) = e
218            {
219                p.emit((filter, block_hash, height));
220            }
221        });
222        let (publisher, subscriber) = event::broadcast({
223            let mut mapper = Mapper::default();
224            move |e, p| mapper.process(e, p)
225        });
226
227        let publisher = Publisher::default()
228            .register(event_pub)
229            .register(blocks_pub)
230            .register(filters_pub)
231            .register(publisher);
232
233        let (shutdown, shutdown_recv) = chan::bounded(1);
234        let (listening_send, listening) = chan::bounded(1);
235        let reactor = <R as Reactor>::new(shutdown_recv, listening_send)?;
236        let handle = Handle {
237            commands: commands_tx,
238            events,
239            blocks,
240            filters,
241            subscriber,
242            waker: reactor.waker(),
243            timeout: time::Duration::from_secs(60),
244            shutdown,
245            listening,
246        };
247
248        Ok(Self {
249            handle,
250            commands: commands_rx,
251            publisher,
252            reactor,
253        })
254    }
255
256    /// Load the client configuration. Takes a loading handler that can optionally receive
257    /// loading events.
258    pub fn load(
259        self,
260        config: Config,
261        loading: impl Into<LoadingHandler>,
262    ) -> Result<ClientRunner<R>, Error> {
263        let loading = loading.into();
264        let home = config.root.join(".nakamoto");
265        let network = config.network;
266        let dir = home.join(network.as_str());
267        let listen = config.listen.clone();
268
269        fs::create_dir_all(&dir)?;
270
271        let genesis = network.genesis();
272        let params = network.params();
273
274        log::info!(target: "client", "Initializing client ({:?})..", network);
275        log::info!(target: "client", "Genesis block hash is {}", network.genesis_hash());
276
277        let path = dir.join("headers.db");
278        let store = match store::File::create(&path, genesis) {
279            Ok(store) => {
280                log::info!(target: "client", "Initializing new block store {:?}", path);
281                store
282            }
283            Err(store::Error::Io(e)) if e.kind() == io::ErrorKind::AlreadyExists => {
284                log::info!(target: "client", "Found existing store {:?}", path);
285                let store = store::File::open(path, genesis)?;
286
287                if store.check().is_err() {
288                    log::warn!(target: "client", "Corruption detected in header store, healing..");
289                    store.heal()?; // Rollback store to the last valid header.
290                }
291                log::info!(target: "client", "Store height = {}", store.height()?);
292
293                store
294            }
295            Err(err) => return Err(err.into()),
296        };
297
298        let local_time = SystemTime::now().into();
299        let checkpoints = network.checkpoints().collect::<Vec<_>>();
300        let clock = AdjustedTime::<net::SocketAddr>::new(local_time);
301        let rng = fastrand::Rng::new();
302
303        log::info!(target: "client", "Loading block headers from store..");
304
305        let cache = BlockCache::new(store, params, &checkpoints)?
306            .load_with(|height| loading.send(Loading::BlockHeaderLoaded { height }))?;
307
308        log::info!(target: "client", "Initializing block filters..");
309
310        let cfheaders_genesis = filter::cache::StoredHeader::genesis(network);
311        let cfheaders_path = dir.join("filters.db");
312        let cfheaders_store = match store::File::create(&cfheaders_path, cfheaders_genesis) {
313            Ok(store) => {
314                log::info!(target: "client", "Initializing new filter header store {:?}", cfheaders_path);
315                store
316            }
317            Err(store::Error::Io(e)) if e.kind() == io::ErrorKind::AlreadyExists => {
318                log::info!(target: "client", "Found existing store {:?}", cfheaders_path);
319                let store = store::File::open(cfheaders_path, cfheaders_genesis)?;
320
321                if store.check().is_err() {
322                    log::warn!(target: "client", "Corruption detected in filter store, healing..");
323                    store.heal()?; // Rollback store to the last valid header.
324                }
325                log::info!(target: "client", "Filters height = {}", store.height()?);
326
327                store
328            }
329            Err(err) => return Err(err.into()),
330        };
331        log::info!(target: "client", "Loading filter headers from store..");
332
333        let filters = FilterCache::load_with(cfheaders_store, |height| {
334            loading.send(Loading::FilterHeaderLoaded { height })
335        })?;
336
337        if config.verify {
338            log::info!(target: "client", "Verifying filter headers..");
339
340            filters.verify_with(network, |height| {
341                loading.send(Loading::FilterHeaderVerified { height })
342            })?; // Verify store integrity.
343        } else {
344            log::info!(target: "client", "Skipping filter header verification (verify = false)")
345        }
346
347        log::info!(target: "client", "Loading peer addresses..");
348
349        let peers_path = dir.join("peers.json");
350        let mut peers = match peer::Cache::create(&peers_path) {
351            Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
352                log::info!(target: "client", "Found existing peer cache {:?}", peers_path);
353                let cache = peer::Cache::open(&peers_path).map_err(Error::PeerStore)?;
354                let cfpeers = cache
355                    .iter()
356                    .filter(|(_, ka)| ka.addr.services.has(ServiceFlags::COMPACT_FILTERS))
357                    .count();
358
359                log::info!(
360                    target: "client",
361                    "{} peer(s) found.. {} with compact filters support",
362                    cache.len(),
363                    cfpeers
364                );
365                cache
366            }
367            Err(err) => {
368                return Err(Error::PeerStore(err));
369            }
370            Ok(cache) => {
371                log::info!(target: "client", "Initializing new peer address cache {:?}", peers_path);
372                cache
373            }
374        };
375
376        log::trace!(target: "client", "{:#?}", peers);
377
378        if config.connect.is_empty() && peers.is_empty() {
379            log::info!(target: "client", "Address book is empty. Trying DNS seeds..");
380            peers.seed(
381                network.seeds().iter().map(|s| (*s, network.port())),
382                Source::Dns,
383            )?;
384            peers.flush()?;
385
386            log::info!(target: "client", "{} seeds added to address book", peers.len());
387        }
388
389        Ok(ClientRunner {
390            listen,
391            commands: self.commands,
392            publisher: self.publisher,
393            reactor: self.reactor,
394            service: Service::new(cache, filters, peers, RefClock::from(clock), rng, config),
395        })
396    }
397
398    /// Start the client process. This function is meant to be run in its own thread.
399    pub fn run(self, config: Config) -> Result<(), Error> {
400        self.load(config, LoadingHandler::Ignore)?.run()
401    }
402
403    /// Start the client process, supplying the service manually.
404    /// This function is meant to be run in its own thread.
405    pub fn run_service<T>(mut self, listen: &[net::SocketAddr], service: T) -> Result<(), Error>
406    where
407        T: nakamoto_net::Service<Event = fsm::Event, Command = Command>,
408    {
409        self.reactor.run::<T, Publisher<fsm::Event>>(
410            listen,
411            service,
412            self.publisher,
413            self.commands,
414        )?;
415
416        Ok(())
417    }
418
419    /// Create a new handle to communicate with the client.
420    pub fn handle(&self) -> Handle<R::Waker> {
421        self.handle.clone()
422    }
423}
424
425/// An instance of [`handle::Handle`] for [`Client`].
426pub struct Handle<W: Waker> {
427    commands: chan::Sender<Command>,
428    events: event::Subscriber<fsm::Event>,
429    blocks: event::Subscriber<(Block, Height)>,
430    filters: event::Subscriber<(BlockFilter, BlockHash, Height)>,
431    subscriber: event::Subscriber<Event>,
432    waker: W,
433    timeout: time::Duration,
434    shutdown: chan::Sender<()>,
435    listening: chan::Receiver<net::SocketAddr>,
436}
437
438impl<W: Waker> Clone for Handle<W> {
439    fn clone(&self) -> Self {
440        Self {
441            blocks: self.blocks.clone(),
442            commands: self.commands.clone(),
443            events: self.events.clone(),
444            filters: self.filters.clone(),
445            subscriber: self.subscriber.clone(),
446            timeout: self.timeout,
447            waker: self.waker.clone(),
448            shutdown: self.shutdown.clone(),
449            listening: self.listening.clone(),
450        }
451    }
452}
453
454impl<W: Waker> Handle<W> {
455    /// Wait for node to start listening for incoming connections.
456    pub fn listening(&mut self) -> Result<net::SocketAddr, handle::Error> {
457        Ok(self.listening.recv_timeout(self.timeout)?)
458    }
459
460    /// Set the timeout for operations that wait on the network.
461    pub fn set_timeout(&mut self, timeout: time::Duration) {
462        self.timeout = timeout;
463    }
464
465    /// Get connected peers.
466    pub fn get_peers(&self, services: impl Into<ServiceFlags>) -> Result<Vec<Peer>, handle::Error> {
467        let (sender, recvr) = chan::bounded(1);
468        self._command(Command::GetPeers(services.into(), sender))?;
469
470        Ok(recvr.recv()?)
471    }
472
473    /// Get block by height.
474    pub fn get_block_by_height(
475        &self,
476        height: Height,
477    ) -> Result<Option<BlockHeader>, handle::Error> {
478        let (sender, recvr) = chan::bounded(1);
479        self._command(Command::GetBlockByHeight(height, sender))?;
480
481        Ok(recvr.recv()?)
482    }
483
484    /// Send a command to the command channel, and wake up the event loop.
485    fn _command(&self, cmd: Command) -> Result<(), handle::Error> {
486        self.commands.send(cmd)?;
487        self.waker.wake()?;
488
489        Ok(())
490    }
491}
492
493impl<W: Waker> handle::Handle for Handle<W> {
494    fn get_tip(&self) -> Result<(Height, BlockHeader), handle::Error> {
495        let (transmit, receive) = chan::bounded::<(Height, BlockHeader)>(1);
496        self.command(Command::GetTip(transmit))?;
497
498        Ok(receive.recv()?)
499    }
500
501    fn query_tree(
502        &self,
503        query: impl Fn(&dyn BlockReader) + Send + Sync + 'static,
504    ) -> Result<(), handle::Error> {
505        use std::sync::Arc;
506
507        self.command(Command::QueryTree(Arc::new(query)))?;
508
509        Ok(())
510    }
511
512    fn find_branch(
513        &self,
514        to: &BlockHash,
515    ) -> Result<Option<(Height, NonEmpty<BlockHeader>)>, handle::Error> {
516        let to = *to;
517        let (transmit, receive) = chan::bounded(1);
518
519        self.query_tree(move |t| {
520            transmit.send(t.find_branch(&to)).ok();
521        })?;
522
523        Ok(receive.recv()?)
524    }
525
526    fn get_block(&self, hash: &BlockHash) -> Result<(), handle::Error> {
527        self.command(Command::GetBlock(*hash))?;
528
529        Ok(())
530    }
531
532    fn get_filters(&self, range: RangeInclusive<Height>) -> Result<(), handle::Error> {
533        assert!(
534            !range.is_empty(),
535            "client::Handle::get_filters: range cannot be empty"
536        );
537        let (transmit, receive) = chan::bounded(1);
538        self.command(Command::GetFilters(range, transmit))?;
539
540        receive.recv()?.map_err(handle::Error::GetFilters)
541    }
542
543    fn blocks(&self) -> chan::Receiver<(Block, Height)> {
544        self.blocks.subscribe()
545    }
546
547    fn filters(&self) -> chan::Receiver<(BlockFilter, BlockHash, Height)> {
548        self.filters.subscribe()
549    }
550
551    fn events(&self) -> chan::Receiver<Event> {
552        self.subscriber.subscribe()
553    }
554
555    fn command(&self, cmd: Command) -> Result<(), handle::Error> {
556        self._command(cmd)
557    }
558
559    fn broadcast(
560        &self,
561        msg: NetworkMessage,
562        predicate: fn(Peer) -> bool,
563    ) -> Result<Vec<net::SocketAddr>, handle::Error> {
564        let (transmit, receive) = chan::bounded(1);
565        self.command(Command::Broadcast(msg, predicate, transmit))?;
566
567        Ok(receive.recv()?)
568    }
569
570    fn query(&self, msg: NetworkMessage) -> Result<Option<net::SocketAddr>, handle::Error> {
571        let (transmit, receive) = chan::bounded::<Option<net::SocketAddr>>(1);
572        self.command(Command::Query(msg, transmit))?;
573
574        Ok(receive.recv()?)
575    }
576
577    fn connect(&self, addr: net::SocketAddr) -> Result<Link, handle::Error> {
578        let events = self.events.subscribe();
579        self.command(Command::Connect(addr))?;
580
581        event::wait(
582            &events,
583            |e| match e {
584                fsm::Event::Peer(fsm::PeerEvent::Connected(a, link))
585                    if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) =>
586                {
587                    Some(link)
588                }
589                _ => None,
590            },
591            self.timeout,
592        )
593        .map_err(handle::Error::from)
594    }
595
596    fn disconnect(&self, addr: net::SocketAddr) -> Result<(), handle::Error> {
597        let events = self.events.subscribe();
598
599        self.command(Command::Disconnect(addr))?;
600        event::wait(
601            &events,
602            |e| match e {
603                fsm::Event::Peer(fsm::PeerEvent::Disconnected(a, _))
604                    if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) =>
605                {
606                    Some(())
607                }
608                _ => None,
609            },
610            self.timeout,
611        )?;
612
613        Ok(())
614    }
615
616    fn import_headers(
617        &self,
618        headers: Vec<BlockHeader>,
619    ) -> Result<Result<ImportResult, tree::Error>, handle::Error> {
620        let (transmit, receive) = chan::bounded::<Result<ImportResult, tree::Error>>(1);
621        self.command(Command::ImportHeaders(headers, transmit))?;
622
623        Ok(receive.recv()?)
624    }
625
626    fn import_addresses(&self, addrs: Vec<Address>) -> Result<(), handle::Error> {
627        self.command(Command::ImportAddresses(addrs))?;
628
629        Ok(())
630    }
631
632    fn submit_transaction(
633        &self,
634        tx: Transaction,
635    ) -> Result<NonEmpty<net::SocketAddr>, handle::Error> {
636        let (transmit, receive) = chan::bounded(1);
637        self.command(Command::SubmitTransaction(tx, transmit))?;
638
639        receive.recv()?.map_err(handle::Error::Command)
640    }
641
642    fn wait<F, T>(&self, f: F) -> Result<T, handle::Error>
643    where
644        F: FnMut(fsm::Event) -> Option<T>,
645    {
646        let events = self.events.subscribe();
647        let result = event::wait(&events, f, self.timeout)?;
648
649        Ok(result)
650    }
651
652    fn wait_for_peers(
653        &self,
654        count: usize,
655        required_services: impl Into<ServiceFlags>,
656    ) -> Result<Vec<(net::SocketAddr, Height, ServiceFlags)>, handle::Error> {
657        let events = self.events.subscribe();
658        let required_services = required_services.into();
659
660        let negotiated = self.get_peers(required_services)?;
661        if negotiated.len() == count {
662            return Ok(negotiated
663                .into_iter()
664                .map(|p| (p.addr, p.height, p.services))
665                .collect());
666        }
667
668        let mut negotiated = negotiated
669            .into_iter()
670            .map(|p| (p.addr, (p.height, p.services)))
671            .collect::<HashMap<_, _>>(); // Get already connected peers.
672
673        event::wait(
674            &events,
675            |e| match e {
676                fsm::Event::Peer(fsm::PeerEvent::Negotiated {
677                    addr,
678                    height,
679                    services,
680                    ..
681                }) => {
682                    if services.has(required_services) {
683                        negotiated.insert(addr, (height, services));
684                    }
685
686                    if negotiated.len() == count {
687                        Some(negotiated.iter().map(|(a, (h, s))| (*a, *h, *s)).collect())
688                    } else {
689                        None
690                    }
691                }
692                _ => None,
693            },
694            self.timeout,
695        )
696        .map_err(handle::Error::from)
697    }
698
699    fn wait_for_height(&self, h: Height) -> Result<BlockHash, handle::Error> {
700        let events = self.events.subscribe();
701
702        match self.get_block_by_height(h)? {
703            Some(e) => Ok(e.block_hash()),
704            None => event::wait(
705                &events,
706                |e| match e {
707                    fsm::Event::Chain(fsm::ChainEvent::Synced(hash, height)) if height == h => {
708                        Some(hash)
709                    }
710                    _ => None,
711                },
712                self.timeout,
713            )
714            .map_err(handle::Error::from),
715        }
716    }
717
718    fn shutdown(self) -> Result<(), handle::Error> {
719        self.shutdown.send(())?;
720        self.waker.wake()?;
721
722        Ok(())
723    }
724}
725
726/// Client traits re-exports.
727pub mod traits {
728    pub use crate::handle::Handle;
729}