Skip to main content

tycho_core/node/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8    DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13#[cfg(feature = "cli")]
14pub use self::cli::{CmdRunArgs, CmdRunOnlyArgs, CmdRunStatus, LightNodeConfig, LightNodeContext};
15pub use self::config::NodeBaseConfig;
16pub use self::keys::NodeKeys;
17use crate::block_strider::{
18    ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
19    ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
20    StorageBlockProvider,
21};
22use crate::blockchain_rpc::{
23    BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
24};
25use crate::global_config::{GlobalConfig, ZerostateId};
26use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
27#[cfg(feature = "s3")]
28use crate::s3::S3Client;
29use crate::storage::{CoreStorage, CoreStorageConfig};
30
31#[cfg(feature = "cli")]
32mod cli;
33mod config;
34mod keys;
35
36pub struct NodeBootArgs {
37    /// Default: [`ColdBootType::LatestPersistent`].
38    pub boot_type: ColdBootType,
39    /// Default: None
40    pub zerostates: Option<Vec<PathBuf>>,
41    /// Default: None
42    pub queue_state_handler: Option<Box<dyn QueueStateHandler>>,
43    /// Default: false
44    pub ignore_states: bool,
45}
46
47impl Default for NodeBootArgs {
48    #[inline]
49    fn default() -> Self {
50        Self {
51            boot_type: ColdBootType::LatestPersistent,
52            zerostates: None,
53            queue_state_handler: None,
54            ignore_states: false,
55        }
56    }
57}
58
59pub struct NodeBase {
60    pub base_config: NodeBaseConfig,
61    pub global_config: GlobalConfig,
62    pub initial_peer_count: usize,
63
64    pub keypair: Arc<ed25519::KeyPair>,
65    pub network: Network,
66    pub dht_client: DhtClient,
67    pub peer_resolver: PeerResolver,
68    pub overlay_service: OverlayService,
69
70    pub storage_context: StorageContext,
71    pub core_storage: CoreStorage,
72
73    pub blockchain_rpc_client: BlockchainRpcClient,
74
75    #[cfg(feature = "s3")]
76    pub s3_client: Option<S3Client>,
77}
78
79impl NodeBase {
80    const DEFAULT_INITIAL_PEER_COUNT: usize = 3;
81
82    pub fn builder<'a>(
83        base_config: &'a NodeBaseConfig,
84        global_config: &'a GlobalConfig,
85    ) -> NodeBaseBuilder<'a, ()> {
86        crate::record_version_metric();
87        NodeBaseBuilder::new(base_config, global_config)
88    }
89
90    /// Wait for some peers and boot the node.
91    pub async fn init(
92        &self,
93        boot_type: ColdBootType,
94        import_zerostate: Option<Vec<PathBuf>>,
95        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
96    ) -> Result<BlockId> {
97        self.init_ext(NodeBootArgs {
98            boot_type,
99            zerostates: import_zerostate,
100            queue_state_handler,
101            ..Default::default()
102        })
103        .await
104    }
105
106    /// Wait for some peers and boot the node.
107    pub async fn init_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
108        self.wait_for_neighbours(self.initial_peer_count).await;
109
110        let init_block_id = self.boot_ext(args).await.context("failed to init node")?;
111        tracing::info!(%init_block_id, "node initialized");
112
113        Ok(init_block_id)
114    }
115
116    /// Wait for at least `count` public overlay peers to resolve.
117    pub async fn wait_for_neighbours(&self, count: usize) {
118        // Ensure that there are some neighbours
119        tracing::info!("waiting for initial neighbours");
120        self.blockchain_rpc_client
121            .overlay_client()
122            .neighbours()
123            .wait_for_peers(count)
124            .await;
125        tracing::info!("found initial neighbours");
126    }
127
128    /// Initialize the node and return the init block id.
129    pub async fn boot(
130        &self,
131        boot_type: ColdBootType,
132        zerostates: Option<Vec<PathBuf>>,
133        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
134    ) -> Result<BlockId> {
135        self.boot_ext(NodeBootArgs {
136            boot_type,
137            zerostates,
138            queue_state_handler,
139            ..Default::default()
140        })
141        .await
142    }
143
144    /// Initialize the node and return the init block id.
145    pub async fn boot_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
146        let node_state = self.core_storage.node_state();
147
148        let last_mc_block_id = match node_state.load_last_mc_block_id() {
149            Some(block_id) => block_id,
150            None => {
151                let mut starter = Starter::builder()
152                    .with_storage(self.core_storage.clone())
153                    .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
154                    .with_zerostate_id(self.global_config.zerostate)
155                    .with_config(self.base_config.starter.clone())
156                    .ignore_states(args.ignore_states);
157
158                if let Some(handler) = args.queue_state_handler {
159                    starter = starter.with_queue_state_handler(handler);
160                }
161
162                #[cfg(feature = "s3")]
163                if let Some(s3_client) = self.s3_client.as_ref() {
164                    starter = starter.with_s3_client(s3_client.clone());
165                }
166
167                starter
168                    .build()
169                    .cold_boot(args.boot_type, args.zerostates.map(FileZerostateProvider))
170                    .await?
171            }
172        };
173
174        tracing::info!(
175            %last_mc_block_id,
176            "boot finished"
177        );
178
179        Ok(last_mc_block_id)
180    }
181
182    pub fn validator_resolver(&self) -> &ValidatorsResolver {
183        self.blockchain_rpc_client
184            .overlay_client()
185            .validators_resolver()
186    }
187
188    /// Update current validator targets with the specified set.
189    pub fn update_validator_set(&self, vset: &ValidatorSet) {
190        self.validator_resolver().update_validator_set(vset);
191    }
192
193    /// Update current validator targets using the validator set from the provider
194    pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
195        // notify subscriber with an initial validators list
196        let mc_state = self
197            .core_storage
198            .shard_state_storage()
199            .load_state(block_id.seqno, block_id)
200            .await
201            .context("update_validator_set_from_shard_state failed to load state")?;
202
203        let config = mc_state.config_params()?;
204        let current_vset = config.get_current_validator_set()?;
205        self.update_validator_set(&current_vset);
206        Ok(())
207    }
208
209    pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
210        ArchiveBlockProvider::new(
211            (
212                self.blockchain_rpc_client.clone(),
213                #[cfg(feature = "s3")]
214                self.s3_client.clone(),
215            ),
216            self.core_storage.clone(),
217            self.base_config.archive_block_provider.clone(),
218        )
219    }
220
221    pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
222        BlockchainBlockProvider::new(
223            self.blockchain_rpc_client.clone(),
224            self.core_storage.clone(),
225            self.base_config.blockchain_block_provider.clone(),
226        )
227    }
228
229    pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
230        StorageBlockProvider::new(self.core_storage.clone())
231    }
232
233    /// Creates a new [`BlockStrider`] using options from the base config.
234    pub fn build_strider<P, S>(
235        &self,
236        provider: P,
237        subscriber: S,
238    ) -> BlockStrider<PersistentBlockStriderState, P, S>
239    where
240        P: BlockProvider,
241        S: BlockSubscriber,
242    {
243        let state = PersistentBlockStriderState::new(
244            self.global_config.zerostate.as_block_id(),
245            self.core_storage.clone(),
246        );
247
248        BlockStrider::builder()
249            .with_state(state)
250            .with_provider(provider)
251            .with_block_subscriber(subscriber)
252            .build()
253    }
254}
255
256pub struct NodeBaseBuilder<'a, Step = ()> {
257    common: NodeBaseBuilderCommon<'a>,
258    step: Step,
259}
260
261impl<'a> NodeBaseBuilder<'a, ()> {
262    pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
263        Self {
264            common: NodeBaseBuilderCommon {
265                base_config,
266                global_config,
267                initial_peer_count: NodeBase::DEFAULT_INITIAL_PEER_COUNT,
268            },
269            step: (),
270        }
271    }
272
273    pub fn init_network(
274        self,
275        public_addr: SocketAddr,
276        secret_key: &ed25519::SecretKey,
277    ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
278        let net = ConfiguredNetwork::new(
279            public_addr,
280            secret_key,
281            self.common.base_config,
282            &self.common.global_config.bootstrap_peers,
283        )?;
284
285        Ok(NodeBaseBuilder {
286            common: self.common,
287            step: init::Step0 { net },
288        })
289    }
290}
291
292impl<'a> NodeBaseBuilder<'a, init::Step0> {
293    // TODO: Add some options here if needed.
294    pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
295        let store = ConfiguredStorage::new(
296            &self.common.base_config.storage,
297            &self.common.base_config.core_storage,
298        )
299        .await?;
300
301        Ok(NodeBaseBuilder {
302            common: self.common,
303            step: init::Step1 {
304                prev_step: self.step,
305                store,
306            },
307        })
308    }
309}
310
311impl<'a> NodeBaseBuilder<'a, init::Step1> {
312    pub fn init_blockchain_rpc<RL, SL>(
313        self,
314        remote_broadcast_listener: RL,
315        self_broadcast_listener: SL,
316    ) -> Result<NodeBaseBuilder<'a, init::Step2>>
317    where
318        RL: BroadcastListener,
319        SL: SelfBroadcastListener,
320    {
321        let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
322            &self.common.global_config.zerostate,
323            self.step.store.core_storage.clone(),
324            remote_broadcast_listener,
325            self_broadcast_listener,
326            self.common.base_config,
327        );
328
329        Ok(NodeBaseBuilder {
330            common: self.common,
331            step: init::Step2 {
332                prev_step: self.step,
333                blockchain_rpc_client,
334            },
335        })
336    }
337}
338
339impl<'a> NodeBaseBuilder<'a, init::Final> {
340    pub fn build(self) -> Result<NodeBase> {
341        let net = self.step.prev_step.prev_step.net;
342        let store = self.step.prev_step.store;
343        let blockchain_rpc_client = self.step.blockchain_rpc_client;
344
345        Ok(NodeBase {
346            base_config: self.common.base_config.clone(),
347            global_config: self.common.global_config.clone(),
348            initial_peer_count: self.common.initial_peer_count,
349            keypair: net.keypair,
350            network: net.network,
351            dht_client: net.dht_client,
352            peer_resolver: net.peer_resolver,
353            overlay_service: net.overlay_service,
354            storage_context: store.context,
355            core_storage: store.core_storage,
356            blockchain_rpc_client,
357            #[cfg(feature = "s3")]
358            s3_client: self
359                .common
360                .base_config
361                .s3_client
362                .as_ref()
363                .map(S3Client::new)
364                .transpose()
365                .context("failed to create S3 client")?,
366        })
367    }
368}
369
370impl<'a, Step> NodeBaseBuilder<'a, Step> {
371    pub fn base_config(&self) -> &'a NodeBaseConfig {
372        self.common.base_config
373    }
374
375    pub fn global_config(&self) -> &'a GlobalConfig {
376        self.common.global_config
377    }
378
379    pub fn initial_peer_count(&self) -> usize {
380        self.common.initial_peer_count
381    }
382
383    pub fn with_initial_peer_count(mut self, count: usize) -> Self {
384        self.common.initial_peer_count = count;
385        self
386    }
387}
388
389impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
390    pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
391        &self.step.as_ref().net.keypair
392    }
393
394    pub fn network(&self) -> &Network {
395        &self.step.as_ref().net.network
396    }
397
398    pub fn dht_client(&self) -> &DhtClient {
399        &self.step.as_ref().net.dht_client
400    }
401
402    pub fn peer_resolver(&self) -> &PeerResolver {
403        &self.step.as_ref().net.peer_resolver
404    }
405
406    pub fn overlay_service(&self) -> &OverlayService {
407        &self.step.as_ref().net.overlay_service
408    }
409}
410
411impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
412    pub fn storage_context(&self) -> &StorageContext {
413        &self.step.as_ref().store.context
414    }
415
416    pub fn core_storage(&self) -> &CoreStorage {
417        &self.step.as_ref().store.core_storage
418    }
419}
420
421impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
422    pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
423        &self.step.as_ref().blockchain_rpc_client
424    }
425}
426
427struct NodeBaseBuilderCommon<'a> {
428    base_config: &'a NodeBaseConfig,
429    global_config: &'a GlobalConfig,
430    initial_peer_count: usize,
431}
432
433pub mod init {
434    use super::*;
435
436    pub type Final = Step2;
437
438    /// Node with network.
439    pub struct Step0 {
440        pub(super) net: ConfiguredNetwork,
441    }
442
443    impl AsRef<Step0> for Step0 {
444        #[inline]
445        fn as_ref(&self) -> &Step0 {
446            self
447        }
448    }
449
450    /// Node with network and storage.
451    pub struct Step1 {
452        pub(super) prev_step: Step0,
453        pub(super) store: ConfiguredStorage,
454    }
455
456    impl AsRef<Step0> for Step1 {
457        #[inline]
458        fn as_ref(&self) -> &Step0 {
459            &self.prev_step
460        }
461    }
462
463    impl AsRef<Step1> for Step1 {
464        #[inline]
465        fn as_ref(&self) -> &Step1 {
466            self
467        }
468    }
469
470    /// Node with network, storage and public overlay.
471    pub struct Step2 {
472        pub(super) prev_step: Step1,
473        pub(super) blockchain_rpc_client: BlockchainRpcClient,
474    }
475
476    impl AsRef<Step0> for Step2 {
477        #[inline]
478        fn as_ref(&self) -> &Step0 {
479            &self.prev_step.prev_step
480        }
481    }
482
483    impl AsRef<Step1> for Step2 {
484        #[inline]
485        fn as_ref(&self) -> &Step1 {
486            &self.prev_step
487        }
488    }
489
490    impl AsRef<Step2> for Step2 {
491        #[inline]
492        fn as_ref(&self) -> &Step2 {
493            self
494        }
495    }
496}
497
498pub struct ConfiguredNetwork {
499    pub keypair: Arc<ed25519::KeyPair>,
500    pub network: Network,
501    pub dht_client: DhtClient,
502    pub peer_resolver: PeerResolver,
503    pub overlay_service: OverlayService,
504}
505
506impl ConfiguredNetwork {
507    pub fn new(
508        public_addr: SocketAddr,
509        secret_key: &ed25519::SecretKey,
510        base_config: &NodeBaseConfig,
511        bootstrap_peers: &[PeerInfo],
512    ) -> Result<Self> {
513        // Setup network
514        let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
515        let local_id = keypair.public_key.into();
516
517        let (dht_tasks, dht_service) = DhtService::builder(local_id)
518            .with_config(base_config.dht.clone())
519            .build();
520
521        let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
522            .with_config(base_config.overlay.clone())
523            .with_dht_service(dht_service.clone())
524            .build();
525
526        let router = Router::builder()
527            .route(dht_service.clone())
528            .route(overlay_service.clone())
529            .build();
530
531        let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
532
533        let network = Network::builder()
534            .with_config(base_config.network.clone())
535            .with_private_key(secret_key.to_bytes())
536            .with_remote_addr(public_addr)
537            .build(local_addr, router)
538            .context("failed to build node network")?;
539
540        let bootstrap_peer_count = dht_tasks.spawn(&network, bootstrap_peers)?;
541        overlay_tasks.spawn(&network);
542
543        let dht_client = dht_service.make_client(&network);
544        let peer_resolver = dht_service
545            .make_peer_resolver()
546            .with_config(base_config.peer_resolver.clone())
547            .build(&network);
548
549        tracing::info!(
550            %local_id,
551            %local_addr,
552            %public_addr,
553            bootstrap_peers = bootstrap_peer_count,
554            "initialized network"
555        );
556
557        Ok(Self {
558            keypair,
559            network,
560            dht_client,
561            peer_resolver,
562            overlay_service,
563        })
564    }
565
566    pub fn add_blockchain_rpc<BL, SL>(
567        &self,
568        zerostate: &ZerostateId,
569        storage: CoreStorage,
570        remote_broadcast_listener: BL,
571        self_broadcast_listener: SL,
572        base_config: &NodeBaseConfig,
573    ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
574    where
575        BL: BroadcastListener,
576        SL: SelfBroadcastListener,
577    {
578        let blockchain_rpc_service = BlockchainRpcService::builder()
579            .with_config(base_config.blockchain_rpc_service.clone())
580            .with_storage(storage)
581            .with_broadcast_listener(remote_broadcast_listener)
582            .build();
583
584        let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
585            .named("blockchain_rpc")
586            .with_peer_resolver(self.peer_resolver.clone())
587            .build(blockchain_rpc_service.clone());
588        self.overlay_service.add_public_overlay(&public_overlay);
589
590        let blockchain_rpc_client = BlockchainRpcClient::builder()
591            .with_config(base_config.blockchain_rpc_client.clone())
592            .with_public_overlay_client(PublicOverlayClient::new(
593                self.network.clone(),
594                public_overlay,
595                base_config.public_overlay_client.clone(),
596            ))
597            .with_self_broadcast_listener(self_broadcast_listener)
598            .build();
599
600        tracing::info!(
601            overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
602            "initialized blockchain rpc"
603        );
604
605        (blockchain_rpc_service, blockchain_rpc_client)
606    }
607}
608
609pub struct ConfiguredStorage {
610    pub context: StorageContext,
611    pub core_storage: CoreStorage,
612}
613
614impl ConfiguredStorage {
615    pub async fn new(
616        storage_config: &StorageConfig,
617        core_storage_config: &CoreStorageConfig,
618    ) -> Result<Self> {
619        let context = StorageContext::new(storage_config.clone())
620            .await
621            .context("failed to create storage context")?;
622        let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
623            .await
624            .context("failed to create storage")?;
625        tracing::info!(
626            root_dir = %core_storage.context().root_dir().path().display(),
627            "initialized storage"
628        );
629
630        Ok(Self {
631            context,
632            core_storage,
633        })
634    }
635}