1use std::collections::HashMap;
4use std::env;
5use std::fs;
6use std::io;
7use std::net;
8use std::ops::ControlFlow;
9use std::ops::RangeInclusive;
10use std::path::PathBuf;
11use std::time::{self, SystemTime};
12
13pub use crossbeam_channel as chan;
14
15use nakamoto_chain::block::{store, Block};
16use nakamoto_chain::filter;
17use nakamoto_chain::filter::cache::FilterCache;
18use nakamoto_chain::filter::cache::StoredHeader;
19use nakamoto_chain::{block::cache::BlockCache, filter::BlockFilter};
20
21use nakamoto_common::bitcoin::network::constants::ServiceFlags;
22use nakamoto_common::bitcoin::network::message::NetworkMessage;
23use nakamoto_common::bitcoin::network::Address;
24use nakamoto_common::block::store::{Genesis as _, Store as _};
25use nakamoto_common::block::time::{AdjustedTime, RefClock};
26use nakamoto_common::block::tree::{self, BlockReader, ImportResult};
27use nakamoto_common::block::{BlockHash, BlockHeader, Height, Transaction};
28use nakamoto_common::nonempty::NonEmpty;
29use nakamoto_common::p2p::peer::{Source, Store as _};
30use nakamoto_p2p::fsm;
31
32pub use nakamoto_common::network;
33pub use nakamoto_common::network::Network;
34pub use nakamoto_common::p2p::Domain;
35pub use nakamoto_net::event;
36pub use nakamoto_p2p::fsm::{Command, CommandError, Hooks, Limits, Link, Peer};
37
38pub use crate::error::Error;
39pub use crate::event::{Event, Loading};
40pub use crate::handle;
41pub use crate::service::Service;
42
43use crate::event::Mapper;
44use crate::peer;
45use nakamoto_net::{Reactor, Waker};
46
47#[derive(Debug, Clone)]
49pub struct Config {
50 pub network: Network,
52 pub domains: Vec<Domain>,
54 pub connect: Vec<net::SocketAddr>,
56 pub listen: Vec<net::SocketAddr>,
58 pub root: PathBuf,
60 pub verify: bool,
64 pub user_agent: &'static str,
66 pub hooks: Hooks,
68 pub services: ServiceFlags,
70 pub limits: Limits,
72}
73
74#[derive(Default)]
76pub enum LoadingHandler {
77 #[default]
79 Ignore,
80 Channel(chan::Sender<Loading>),
82}
83
84impl From<chan::Sender<Loading>> for LoadingHandler {
85 fn from(c: chan::Sender<Loading>) -> Self {
86 Self::Channel(c)
87 }
88}
89
90impl LoadingHandler {
91 fn send(&self, event: Loading) -> ControlFlow<()> {
92 match self {
93 Self::Ignore => ControlFlow::Continue(()),
94 Self::Channel(channel) => {
95 if channel.send(event).is_ok() {
96 ControlFlow::Continue(())
97 } else {
98 ControlFlow::Break(())
99 }
100 }
101 }
102 }
103}
104
105impl Config {
106 pub fn new(network: Network) -> Self {
108 Self {
109 network,
110 ..Self::default()
111 }
112 }
113}
114
115impl Default for Config {
116 fn default() -> Self {
117 Self {
118 network: Network::default(),
119 connect: Vec::new(),
120 domains: Domain::all(),
121 listen: vec![([0, 0, 0, 0], 0).into()],
122 root: PathBuf::from(env::var("HOME").unwrap_or_default()),
123 verify: false,
124 user_agent: fsm::USER_AGENT,
125 hooks: Hooks::default(),
126 limits: Limits::default(),
127 services: ServiceFlags::NONE,
128 }
129 }
130}
131
132struct Publisher<E> {
134 publishers: Vec<Box<dyn nakamoto_net::Publisher<E>>>,
135}
136
137impl<E> Publisher<E> {
138 pub fn register(mut self, publisher: impl nakamoto_net::Publisher<E> + 'static) -> Self {
140 self.publishers.push(Box::new(publisher));
141 self
142 }
143}
144
145impl<E> Default for Publisher<E> {
146 fn default() -> Self {
147 Self {
148 publishers: Vec::new(),
149 }
150 }
151}
152
153impl<E> nakamoto_net::Publisher<E> for Publisher<E>
154where
155 E: Clone,
156{
157 fn publish(&mut self, e: E) {
158 for p in self.publishers.iter_mut() {
159 p.publish(e.clone());
160 }
161 }
162}
163
164pub struct ClientRunner<R> {
166 service: Service<
167 BlockCache<store::File<BlockHeader>>,
168 FilterCache<store::File<StoredHeader>>,
169 peer::Cache,
170 RefClock<AdjustedTime<net::SocketAddr>>,
171 >,
172 listen: Vec<net::SocketAddr>,
173 commands: chan::Receiver<Command>,
174 publisher: Publisher<fsm::Event>,
175 reactor: R,
176}
177
178impl<R: Reactor> ClientRunner<R> {
179 pub fn run(mut self) -> Result<(), Error> {
181 self.reactor
182 .run(&self.listen, self.service, self.publisher, self.commands)?;
183
184 Ok(())
185 }
186}
187
188pub struct Client<R: Reactor> {
190 handle: Handle<R::Waker>,
191 commands: chan::Receiver<Command>,
192 publisher: Publisher<fsm::Event>,
193 reactor: R,
194}
195
196impl<R: Reactor> Client<R> {
197 pub fn new() -> Result<Self, Error> {
199 let (commands_tx, commands_rx) = chan::unbounded::<Command>();
200 let (event_pub, events) = event::broadcast(|e, p| p.emit(e));
201 let (blocks_pub, blocks) = event::broadcast(|e, p| {
202 if let fsm::Event::Inventory(fsm::InventoryEvent::BlockProcessed {
203 block,
204 height,
205 ..
206 }) = e
207 {
208 p.emit((block, height));
209 }
210 });
211 let (filters_pub, filters) = event::broadcast(|e, p| {
212 if let fsm::Event::Filter(fsm::FilterEvent::FilterReceived {
213 filter,
214 block_hash,
215 height,
216 ..
217 }) = e
218 {
219 p.emit((filter, block_hash, height));
220 }
221 });
222 let (publisher, subscriber) = event::broadcast({
223 let mut mapper = Mapper::default();
224 move |e, p| mapper.process(e, p)
225 });
226
227 let publisher = Publisher::default()
228 .register(event_pub)
229 .register(blocks_pub)
230 .register(filters_pub)
231 .register(publisher);
232
233 let (shutdown, shutdown_recv) = chan::bounded(1);
234 let (listening_send, listening) = chan::bounded(1);
235 let reactor = <R as Reactor>::new(shutdown_recv, listening_send)?;
236 let handle = Handle {
237 commands: commands_tx,
238 events,
239 blocks,
240 filters,
241 subscriber,
242 waker: reactor.waker(),
243 timeout: time::Duration::from_secs(60),
244 shutdown,
245 listening,
246 };
247
248 Ok(Self {
249 handle,
250 commands: commands_rx,
251 publisher,
252 reactor,
253 })
254 }
255
256 pub fn load(
259 self,
260 config: Config,
261 loading: impl Into<LoadingHandler>,
262 ) -> Result<ClientRunner<R>, Error> {
263 let loading = loading.into();
264 let home = config.root.join(".nakamoto");
265 let network = config.network;
266 let dir = home.join(network.as_str());
267 let listen = config.listen.clone();
268
269 fs::create_dir_all(&dir)?;
270
271 let genesis = network.genesis();
272 let params = network.params();
273
274 log::info!(target: "client", "Initializing client ({:?})..", network);
275 log::info!(target: "client", "Genesis block hash is {}", network.genesis_hash());
276
277 let path = dir.join("headers.db");
278 let store = match store::File::create(&path, genesis) {
279 Ok(store) => {
280 log::info!(target: "client", "Initializing new block store {:?}", path);
281 store
282 }
283 Err(store::Error::Io(e)) if e.kind() == io::ErrorKind::AlreadyExists => {
284 log::info!(target: "client", "Found existing store {:?}", path);
285 let store = store::File::open(path, genesis)?;
286
287 if store.check().is_err() {
288 log::warn!(target: "client", "Corruption detected in header store, healing..");
289 store.heal()?; }
291 log::info!(target: "client", "Store height = {}", store.height()?);
292
293 store
294 }
295 Err(err) => return Err(err.into()),
296 };
297
298 let local_time = SystemTime::now().into();
299 let checkpoints = network.checkpoints().collect::<Vec<_>>();
300 let clock = AdjustedTime::<net::SocketAddr>::new(local_time);
301 let rng = fastrand::Rng::new();
302
303 log::info!(target: "client", "Loading block headers from store..");
304
305 let cache = BlockCache::new(store, params, &checkpoints)?
306 .load_with(|height| loading.send(Loading::BlockHeaderLoaded { height }))?;
307
308 log::info!(target: "client", "Initializing block filters..");
309
310 let cfheaders_genesis = filter::cache::StoredHeader::genesis(network);
311 let cfheaders_path = dir.join("filters.db");
312 let cfheaders_store = match store::File::create(&cfheaders_path, cfheaders_genesis) {
313 Ok(store) => {
314 log::info!(target: "client", "Initializing new filter header store {:?}", cfheaders_path);
315 store
316 }
317 Err(store::Error::Io(e)) if e.kind() == io::ErrorKind::AlreadyExists => {
318 log::info!(target: "client", "Found existing store {:?}", cfheaders_path);
319 let store = store::File::open(cfheaders_path, cfheaders_genesis)?;
320
321 if store.check().is_err() {
322 log::warn!(target: "client", "Corruption detected in filter store, healing..");
323 store.heal()?; }
325 log::info!(target: "client", "Filters height = {}", store.height()?);
326
327 store
328 }
329 Err(err) => return Err(err.into()),
330 };
331 log::info!(target: "client", "Loading filter headers from store..");
332
333 let filters = FilterCache::load_with(cfheaders_store, |height| {
334 loading.send(Loading::FilterHeaderLoaded { height })
335 })?;
336
337 if config.verify {
338 log::info!(target: "client", "Verifying filter headers..");
339
340 filters.verify_with(network, |height| {
341 loading.send(Loading::FilterHeaderVerified { height })
342 })?; } else {
344 log::info!(target: "client", "Skipping filter header verification (verify = false)")
345 }
346
347 log::info!(target: "client", "Loading peer addresses..");
348
349 let peers_path = dir.join("peers.json");
350 let mut peers = match peer::Cache::create(&peers_path) {
351 Err(e) if e.kind() == io::ErrorKind::AlreadyExists => {
352 log::info!(target: "client", "Found existing peer cache {:?}", peers_path);
353 let cache = peer::Cache::open(&peers_path).map_err(Error::PeerStore)?;
354 let cfpeers = cache
355 .iter()
356 .filter(|(_, ka)| ka.addr.services.has(ServiceFlags::COMPACT_FILTERS))
357 .count();
358
359 log::info!(
360 target: "client",
361 "{} peer(s) found.. {} with compact filters support",
362 cache.len(),
363 cfpeers
364 );
365 cache
366 }
367 Err(err) => {
368 return Err(Error::PeerStore(err));
369 }
370 Ok(cache) => {
371 log::info!(target: "client", "Initializing new peer address cache {:?}", peers_path);
372 cache
373 }
374 };
375
376 log::trace!(target: "client", "{:#?}", peers);
377
378 if config.connect.is_empty() && peers.is_empty() {
379 log::info!(target: "client", "Address book is empty. Trying DNS seeds..");
380 peers.seed(
381 network.seeds().iter().map(|s| (*s, network.port())),
382 Source::Dns,
383 )?;
384 peers.flush()?;
385
386 log::info!(target: "client", "{} seeds added to address book", peers.len());
387 }
388
389 Ok(ClientRunner {
390 listen,
391 commands: self.commands,
392 publisher: self.publisher,
393 reactor: self.reactor,
394 service: Service::new(cache, filters, peers, RefClock::from(clock), rng, config),
395 })
396 }
397
398 pub fn run(self, config: Config) -> Result<(), Error> {
400 self.load(config, LoadingHandler::Ignore)?.run()
401 }
402
403 pub fn run_service<T>(mut self, listen: &[net::SocketAddr], service: T) -> Result<(), Error>
406 where
407 T: nakamoto_net::Service<Event = fsm::Event, Command = Command>,
408 {
409 self.reactor.run::<T, Publisher<fsm::Event>>(
410 listen,
411 service,
412 self.publisher,
413 self.commands,
414 )?;
415
416 Ok(())
417 }
418
419 pub fn handle(&self) -> Handle<R::Waker> {
421 self.handle.clone()
422 }
423}
424
425pub struct Handle<W: Waker> {
427 commands: chan::Sender<Command>,
428 events: event::Subscriber<fsm::Event>,
429 blocks: event::Subscriber<(Block, Height)>,
430 filters: event::Subscriber<(BlockFilter, BlockHash, Height)>,
431 subscriber: event::Subscriber<Event>,
432 waker: W,
433 timeout: time::Duration,
434 shutdown: chan::Sender<()>,
435 listening: chan::Receiver<net::SocketAddr>,
436}
437
438impl<W: Waker> Clone for Handle<W> {
439 fn clone(&self) -> Self {
440 Self {
441 blocks: self.blocks.clone(),
442 commands: self.commands.clone(),
443 events: self.events.clone(),
444 filters: self.filters.clone(),
445 subscriber: self.subscriber.clone(),
446 timeout: self.timeout,
447 waker: self.waker.clone(),
448 shutdown: self.shutdown.clone(),
449 listening: self.listening.clone(),
450 }
451 }
452}
453
454impl<W: Waker> Handle<W> {
455 pub fn listening(&mut self) -> Result<net::SocketAddr, handle::Error> {
457 Ok(self.listening.recv_timeout(self.timeout)?)
458 }
459
460 pub fn set_timeout(&mut self, timeout: time::Duration) {
462 self.timeout = timeout;
463 }
464
465 pub fn get_peers(&self, services: impl Into<ServiceFlags>) -> Result<Vec<Peer>, handle::Error> {
467 let (sender, recvr) = chan::bounded(1);
468 self._command(Command::GetPeers(services.into(), sender))?;
469
470 Ok(recvr.recv()?)
471 }
472
473 pub fn get_block_by_height(
475 &self,
476 height: Height,
477 ) -> Result<Option<BlockHeader>, handle::Error> {
478 let (sender, recvr) = chan::bounded(1);
479 self._command(Command::GetBlockByHeight(height, sender))?;
480
481 Ok(recvr.recv()?)
482 }
483
484 fn _command(&self, cmd: Command) -> Result<(), handle::Error> {
486 self.commands.send(cmd)?;
487 self.waker.wake()?;
488
489 Ok(())
490 }
491}
492
493impl<W: Waker> handle::Handle for Handle<W> {
494 fn get_tip(&self) -> Result<(Height, BlockHeader), handle::Error> {
495 let (transmit, receive) = chan::bounded::<(Height, BlockHeader)>(1);
496 self.command(Command::GetTip(transmit))?;
497
498 Ok(receive.recv()?)
499 }
500
501 fn query_tree(
502 &self,
503 query: impl Fn(&dyn BlockReader) + Send + Sync + 'static,
504 ) -> Result<(), handle::Error> {
505 use std::sync::Arc;
506
507 self.command(Command::QueryTree(Arc::new(query)))?;
508
509 Ok(())
510 }
511
512 fn find_branch(
513 &self,
514 to: &BlockHash,
515 ) -> Result<Option<(Height, NonEmpty<BlockHeader>)>, handle::Error> {
516 let to = *to;
517 let (transmit, receive) = chan::bounded(1);
518
519 self.query_tree(move |t| {
520 transmit.send(t.find_branch(&to)).ok();
521 })?;
522
523 Ok(receive.recv()?)
524 }
525
526 fn get_block(&self, hash: &BlockHash) -> Result<(), handle::Error> {
527 self.command(Command::GetBlock(*hash))?;
528
529 Ok(())
530 }
531
532 fn get_filters(&self, range: RangeInclusive<Height>) -> Result<(), handle::Error> {
533 assert!(
534 !range.is_empty(),
535 "client::Handle::get_filters: range cannot be empty"
536 );
537 let (transmit, receive) = chan::bounded(1);
538 self.command(Command::GetFilters(range, transmit))?;
539
540 receive.recv()?.map_err(handle::Error::GetFilters)
541 }
542
543 fn blocks(&self) -> chan::Receiver<(Block, Height)> {
544 self.blocks.subscribe()
545 }
546
547 fn filters(&self) -> chan::Receiver<(BlockFilter, BlockHash, Height)> {
548 self.filters.subscribe()
549 }
550
551 fn events(&self) -> chan::Receiver<Event> {
552 self.subscriber.subscribe()
553 }
554
555 fn command(&self, cmd: Command) -> Result<(), handle::Error> {
556 self._command(cmd)
557 }
558
559 fn broadcast(
560 &self,
561 msg: NetworkMessage,
562 predicate: fn(Peer) -> bool,
563 ) -> Result<Vec<net::SocketAddr>, handle::Error> {
564 let (transmit, receive) = chan::bounded(1);
565 self.command(Command::Broadcast(msg, predicate, transmit))?;
566
567 Ok(receive.recv()?)
568 }
569
570 fn query(&self, msg: NetworkMessage) -> Result<Option<net::SocketAddr>, handle::Error> {
571 let (transmit, receive) = chan::bounded::<Option<net::SocketAddr>>(1);
572 self.command(Command::Query(msg, transmit))?;
573
574 Ok(receive.recv()?)
575 }
576
577 fn connect(&self, addr: net::SocketAddr) -> Result<Link, handle::Error> {
578 let events = self.events.subscribe();
579 self.command(Command::Connect(addr))?;
580
581 event::wait(
582 &events,
583 |e| match e {
584 fsm::Event::Peer(fsm::PeerEvent::Connected(a, link))
585 if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) =>
586 {
587 Some(link)
588 }
589 _ => None,
590 },
591 self.timeout,
592 )
593 .map_err(handle::Error::from)
594 }
595
596 fn disconnect(&self, addr: net::SocketAddr) -> Result<(), handle::Error> {
597 let events = self.events.subscribe();
598
599 self.command(Command::Disconnect(addr))?;
600 event::wait(
601 &events,
602 |e| match e {
603 fsm::Event::Peer(fsm::PeerEvent::Disconnected(a, _))
604 if a == addr || (addr.ip().is_unspecified() && a.port() == addr.port()) =>
605 {
606 Some(())
607 }
608 _ => None,
609 },
610 self.timeout,
611 )?;
612
613 Ok(())
614 }
615
616 fn import_headers(
617 &self,
618 headers: Vec<BlockHeader>,
619 ) -> Result<Result<ImportResult, tree::Error>, handle::Error> {
620 let (transmit, receive) = chan::bounded::<Result<ImportResult, tree::Error>>(1);
621 self.command(Command::ImportHeaders(headers, transmit))?;
622
623 Ok(receive.recv()?)
624 }
625
626 fn import_addresses(&self, addrs: Vec<Address>) -> Result<(), handle::Error> {
627 self.command(Command::ImportAddresses(addrs))?;
628
629 Ok(())
630 }
631
632 fn submit_transaction(
633 &self,
634 tx: Transaction,
635 ) -> Result<NonEmpty<net::SocketAddr>, handle::Error> {
636 let (transmit, receive) = chan::bounded(1);
637 self.command(Command::SubmitTransaction(tx, transmit))?;
638
639 receive.recv()?.map_err(handle::Error::Command)
640 }
641
642 fn wait<F, T>(&self, f: F) -> Result<T, handle::Error>
643 where
644 F: FnMut(fsm::Event) -> Option<T>,
645 {
646 let events = self.events.subscribe();
647 let result = event::wait(&events, f, self.timeout)?;
648
649 Ok(result)
650 }
651
652 fn wait_for_peers(
653 &self,
654 count: usize,
655 required_services: impl Into<ServiceFlags>,
656 ) -> Result<Vec<(net::SocketAddr, Height, ServiceFlags)>, handle::Error> {
657 let events = self.events.subscribe();
658 let required_services = required_services.into();
659
660 let negotiated = self.get_peers(required_services)?;
661 if negotiated.len() == count {
662 return Ok(negotiated
663 .into_iter()
664 .map(|p| (p.addr, p.height, p.services))
665 .collect());
666 }
667
668 let mut negotiated = negotiated
669 .into_iter()
670 .map(|p| (p.addr, (p.height, p.services)))
671 .collect::<HashMap<_, _>>(); event::wait(
674 &events,
675 |e| match e {
676 fsm::Event::Peer(fsm::PeerEvent::Negotiated {
677 addr,
678 height,
679 services,
680 ..
681 }) => {
682 if services.has(required_services) {
683 negotiated.insert(addr, (height, services));
684 }
685
686 if negotiated.len() == count {
687 Some(negotiated.iter().map(|(a, (h, s))| (*a, *h, *s)).collect())
688 } else {
689 None
690 }
691 }
692 _ => None,
693 },
694 self.timeout,
695 )
696 .map_err(handle::Error::from)
697 }
698
699 fn wait_for_height(&self, h: Height) -> Result<BlockHash, handle::Error> {
700 let events = self.events.subscribe();
701
702 match self.get_block_by_height(h)? {
703 Some(e) => Ok(e.block_hash()),
704 None => event::wait(
705 &events,
706 |e| match e {
707 fsm::Event::Chain(fsm::ChainEvent::Synced(hash, height)) if height == h => {
708 Some(hash)
709 }
710 _ => None,
711 },
712 self.timeout,
713 )
714 .map_err(handle::Error::from),
715 }
716 }
717
718 fn shutdown(self) -> Result<(), handle::Error> {
719 self.shutdown.send(())?;
720 self.waker.wake()?;
721
722 Ok(())
723 }
724}
725
726pub mod traits {
728 pub use crate::handle::Handle;
729}