Skip to main content

tycho_core/node/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8    DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13#[cfg(feature = "cli")]
14pub use self::cli::{CmdRunArgs, CmdRunOnlyArgs, CmdRunStatus, LightNodeConfig, LightNodeContext};
15pub use self::config::NodeBaseConfig;
16pub use self::keys::NodeKeys;
17use crate::block_strider::{
18    ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
19    ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
20    StorageBlockProvider,
21};
22#[cfg(feature = "s3")]
23use crate::blockchain_rpc::S3RpcDataProvider;
24use crate::blockchain_rpc::{
25    BlockchainRpcClient, BlockchainRpcService, BroadcastListener, IntoRpcDataProvider,
26    SelfBroadcastListener, StorageRpcDataProvider,
27};
28use crate::global_config::{GlobalConfig, ZerostateId};
29use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
30#[cfg(feature = "s3")]
31use crate::s3::S3Client;
32use crate::storage::{CoreStorage, CoreStorageConfig};
33
34#[cfg(feature = "cli")]
35mod cli;
36mod config;
37mod keys;
38
39pub struct NodeBootArgs {
40    /// Default: [`ColdBootType::LatestPersistent`].
41    pub boot_type: ColdBootType,
42    /// Default: None
43    pub zerostates: Option<Vec<PathBuf>>,
44    /// Default: None
45    pub queue_state_handler: Option<Box<dyn QueueStateHandler>>,
46    /// Default: false
47    pub ignore_states: bool,
48}
49
50impl Default for NodeBootArgs {
51    #[inline]
52    fn default() -> Self {
53        Self {
54            boot_type: ColdBootType::LatestPersistent,
55            zerostates: None,
56            queue_state_handler: None,
57            ignore_states: false,
58        }
59    }
60}
61
62pub struct NodeBase {
63    pub base_config: NodeBaseConfig,
64    pub global_config: GlobalConfig,
65    pub initial_peer_count: usize,
66
67    pub keypair: Arc<ed25519::KeyPair>,
68    pub network: Network,
69    pub dht_client: DhtClient,
70    pub peer_resolver: PeerResolver,
71    pub overlay_service: OverlayService,
72
73    pub storage_context: StorageContext,
74    pub core_storage: CoreStorage,
75
76    pub blockchain_rpc_client: BlockchainRpcClient,
77
78    #[cfg(feature = "s3")]
79    pub s3_client: Option<S3Client>,
80}
81
82impl NodeBase {
83    const DEFAULT_INITIAL_PEER_COUNT: usize = 3;
84
85    pub fn builder<'a>(
86        base_config: &'a NodeBaseConfig,
87        global_config: &'a GlobalConfig,
88    ) -> NodeBaseBuilder<'a, ()> {
89        crate::record_version_metric();
90        NodeBaseBuilder::new(base_config, global_config)
91    }
92
93    /// Wait for some peers and boot the node.
94    pub async fn init(
95        &self,
96        boot_type: ColdBootType,
97        import_zerostate: Option<Vec<PathBuf>>,
98        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
99    ) -> Result<BlockId> {
100        self.init_ext(NodeBootArgs {
101            boot_type,
102            zerostates: import_zerostate,
103            queue_state_handler,
104            ..Default::default()
105        })
106        .await
107    }
108
109    /// Wait for some peers and boot the node.
110    pub async fn init_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
111        self.wait_for_neighbours(self.initial_peer_count).await;
112
113        let init_block_id = self.boot_ext(args).await.context("failed to init node")?;
114        tracing::info!(%init_block_id, "node initialized");
115
116        Ok(init_block_id)
117    }
118
119    /// Wait for at least `count` public overlay peers to resolve.
120    pub async fn wait_for_neighbours(&self, count: usize) {
121        // Ensure that there are some neighbours
122        tracing::info!("waiting for initial neighbours");
123        self.blockchain_rpc_client
124            .overlay_client()
125            .neighbours()
126            .wait_for_peers(count)
127            .await;
128        tracing::info!("found initial neighbours");
129    }
130
131    /// Initialize the node and return the init block id.
132    pub async fn boot(
133        &self,
134        boot_type: ColdBootType,
135        zerostates: Option<Vec<PathBuf>>,
136        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
137    ) -> Result<BlockId> {
138        self.boot_ext(NodeBootArgs {
139            boot_type,
140            zerostates,
141            queue_state_handler,
142            ..Default::default()
143        })
144        .await
145    }
146
147    /// Initialize the node and return the init block id.
148    pub async fn boot_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
149        let node_state = self.core_storage.node_state();
150
151        let last_mc_block_id = match node_state.load_last_mc_block_id() {
152            Some(block_id) => block_id,
153            None => {
154                let mut starter = Starter::builder()
155                    .with_storage(self.core_storage.clone())
156                    .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
157                    .with_zerostate_id(self.global_config.zerostate)
158                    .with_config(self.base_config.starter.clone())
159                    .ignore_states(args.ignore_states);
160
161                if let Some(handler) = args.queue_state_handler {
162                    starter = starter.with_queue_state_handler(handler);
163                }
164
165                #[cfg(feature = "s3")]
166                if let Some(s3_client) = self.s3_client.as_ref() {
167                    starter = starter.with_s3_client(s3_client.clone());
168                }
169
170                starter
171                    .build()
172                    .cold_boot(args.boot_type, args.zerostates.map(FileZerostateProvider))
173                    .await?
174            }
175        };
176
177        tracing::info!(
178            %last_mc_block_id,
179            "boot finished"
180        );
181
182        Ok(last_mc_block_id)
183    }
184
185    pub fn validator_resolver(&self) -> &ValidatorsResolver {
186        self.blockchain_rpc_client
187            .overlay_client()
188            .validators_resolver()
189    }
190
191    /// Update current validator targets with the specified set.
192    pub fn update_validator_set(&self, vset: &ValidatorSet) {
193        self.validator_resolver().update_validator_set(vset);
194    }
195
196    /// Update current validator targets using the validator set from the provider
197    pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
198        // notify subscriber with an initial validators list
199        let mc_state = self
200            .core_storage
201            .shard_state_storage()
202            .load_state(block_id.seqno, block_id)
203            .await
204            .context("update_validator_set_from_shard_state failed to load state")?;
205
206        let config = mc_state.config_params()?;
207        let current_vset = config.get_current_validator_set()?;
208        self.update_validator_set(&current_vset);
209        Ok(())
210    }
211
212    pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
213        ArchiveBlockProvider::new(
214            (
215                self.blockchain_rpc_client.clone(),
216                #[cfg(feature = "s3")]
217                self.s3_client.clone(),
218            ),
219            self.core_storage.clone(),
220            self.base_config.archive_block_provider.clone(),
221        )
222    }
223
224    pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
225        BlockchainBlockProvider::new(
226            self.blockchain_rpc_client.clone(),
227            self.core_storage.clone(),
228            self.base_config.blockchain_block_provider.clone(),
229        )
230    }
231
232    pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
233        StorageBlockProvider::new(self.core_storage.clone())
234    }
235
236    /// Creates a new [`BlockStrider`] using options from the base config.
237    pub fn build_strider<P, S>(
238        &self,
239        provider: P,
240        subscriber: S,
241    ) -> BlockStrider<PersistentBlockStriderState, P, S>
242    where
243        P: BlockProvider,
244        S: BlockSubscriber,
245    {
246        let state = PersistentBlockStriderState::new(
247            self.global_config.zerostate.as_block_id(),
248            self.core_storage.clone(),
249        );
250
251        BlockStrider::builder()
252            .with_state(state)
253            .with_provider(provider)
254            .with_block_subscriber(subscriber)
255            .build()
256    }
257}
258
259pub struct NodeBaseBuilder<'a, Step = ()> {
260    common: NodeBaseBuilderCommon<'a>,
261    step: Step,
262}
263
264impl<'a> NodeBaseBuilder<'a, ()> {
265    pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
266        Self {
267            common: NodeBaseBuilderCommon {
268                base_config,
269                global_config,
270                initial_peer_count: NodeBase::DEFAULT_INITIAL_PEER_COUNT,
271            },
272            step: (),
273        }
274    }
275
276    pub fn init_network(
277        self,
278        public_addr: SocketAddr,
279        secret_key: &ed25519::SecretKey,
280    ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
281        let net = ConfiguredNetwork::new(
282            public_addr,
283            secret_key,
284            self.common.base_config,
285            &self.common.global_config.bootstrap_peers,
286        )?;
287
288        Ok(NodeBaseBuilder {
289            common: self.common,
290            step: init::Step0 { net },
291        })
292    }
293}
294
295impl<'a> NodeBaseBuilder<'a, init::Step0> {
296    // TODO: Add some options here if needed.
297    pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
298        let store = ConfiguredStorage::new(
299            &self.common.base_config.storage,
300            &self.common.base_config.core_storage,
301        )
302        .await?;
303
304        Ok(NodeBaseBuilder {
305            common: self.common,
306            step: init::Step1 {
307                prev_step: self.step,
308                store,
309            },
310        })
311    }
312}
313
314impl<'a> NodeBaseBuilder<'a, init::Step1> {
315    pub fn init_blockchain_rpc<RL, SL>(
316        self,
317        remote_broadcast_listener: RL,
318        self_broadcast_listener: SL,
319    ) -> Result<NodeBaseBuilder<'a, init::Step2>>
320    where
321        RL: BroadcastListener,
322        SL: SelfBroadcastListener,
323    {
324        #[cfg(feature = "s3")]
325        let s3_client = self
326            .common
327            .base_config
328            .s3_client
329            .as_ref()
330            .map(S3Client::new)
331            .transpose()
332            .context("failed to create S3 client")?;
333
334        let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
335            &self.common.global_config.zerostate,
336            self.step.store.core_storage.clone(),
337            (
338                StorageRpcDataProvider::new(self.step.store.core_storage.clone()),
339                #[cfg(feature = "s3")]
340                s3_client.clone().and_then(|s3_client| {
341                    let proxy_config = self
342                        .common
343                        .base_config
344                        .blockchain_rpc_service
345                        .s3_proxy
346                        .as_ref()?; // when s3_proxy = None - ignore rpc provider
347                    let storage = self.step.store.core_storage.clone();
348                    Some(S3RpcDataProvider::new(s3_client, storage, proxy_config))
349                }),
350            ),
351            remote_broadcast_listener,
352            self_broadcast_listener,
353            self.common.base_config,
354        );
355
356        Ok(NodeBaseBuilder {
357            common: self.common,
358            step: init::Step2 {
359                prev_step: self.step,
360                blockchain_rpc_client,
361                #[cfg(feature = "s3")]
362                s3_client,
363            },
364        })
365    }
366}
367
368impl<'a> NodeBaseBuilder<'a, init::Final> {
369    pub fn build(self) -> Result<NodeBase> {
370        let net = self.step.prev_step.prev_step.net;
371        let store = self.step.prev_step.store;
372        let blockchain_rpc_client = self.step.blockchain_rpc_client;
373        #[cfg(feature = "s3")]
374        let s3_client = self.step.s3_client;
375
376        Ok(NodeBase {
377            base_config: self.common.base_config.clone(),
378            global_config: self.common.global_config.clone(),
379            initial_peer_count: self.common.initial_peer_count,
380            keypair: net.keypair,
381            network: net.network,
382            dht_client: net.dht_client,
383            peer_resolver: net.peer_resolver,
384            overlay_service: net.overlay_service,
385            storage_context: store.context,
386            core_storage: store.core_storage,
387            blockchain_rpc_client,
388            #[cfg(feature = "s3")]
389            s3_client,
390        })
391    }
392}
393
394impl<'a, Step> NodeBaseBuilder<'a, Step> {
395    pub fn base_config(&self) -> &'a NodeBaseConfig {
396        self.common.base_config
397    }
398
399    pub fn global_config(&self) -> &'a GlobalConfig {
400        self.common.global_config
401    }
402
403    pub fn initial_peer_count(&self) -> usize {
404        self.common.initial_peer_count
405    }
406
407    pub fn with_initial_peer_count(mut self, count: usize) -> Self {
408        self.common.initial_peer_count = count;
409        self
410    }
411}
412
413impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
414    pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
415        &self.step.as_ref().net.keypair
416    }
417
418    pub fn network(&self) -> &Network {
419        &self.step.as_ref().net.network
420    }
421
422    pub fn dht_client(&self) -> &DhtClient {
423        &self.step.as_ref().net.dht_client
424    }
425
426    pub fn peer_resolver(&self) -> &PeerResolver {
427        &self.step.as_ref().net.peer_resolver
428    }
429
430    pub fn overlay_service(&self) -> &OverlayService {
431        &self.step.as_ref().net.overlay_service
432    }
433}
434
435impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
436    pub fn storage_context(&self) -> &StorageContext {
437        &self.step.as_ref().store.context
438    }
439
440    pub fn core_storage(&self) -> &CoreStorage {
441        &self.step.as_ref().store.core_storage
442    }
443}
444
445impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
446    pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
447        &self.step.as_ref().blockchain_rpc_client
448    }
449}
450
451struct NodeBaseBuilderCommon<'a> {
452    base_config: &'a NodeBaseConfig,
453    global_config: &'a GlobalConfig,
454    initial_peer_count: usize,
455}
456
457pub mod init {
458    use super::*;
459
460    pub type Final = Step2;
461
462    /// Node with network.
463    pub struct Step0 {
464        pub(super) net: ConfiguredNetwork,
465    }
466
467    impl AsRef<Step0> for Step0 {
468        #[inline]
469        fn as_ref(&self) -> &Step0 {
470            self
471        }
472    }
473
474    /// Node with network and storage.
475    pub struct Step1 {
476        pub(super) prev_step: Step0,
477        pub(super) store: ConfiguredStorage,
478    }
479
480    impl AsRef<Step0> for Step1 {
481        #[inline]
482        fn as_ref(&self) -> &Step0 {
483            &self.prev_step
484        }
485    }
486
487    impl AsRef<Step1> for Step1 {
488        #[inline]
489        fn as_ref(&self) -> &Step1 {
490            self
491        }
492    }
493
494    /// Node with network, storage and public overlay.
495    pub struct Step2 {
496        pub(super) prev_step: Step1,
497        pub(super) blockchain_rpc_client: BlockchainRpcClient,
498        #[cfg(feature = "s3")]
499        pub(super) s3_client: Option<S3Client>,
500    }
501
502    impl AsRef<Step0> for Step2 {
503        #[inline]
504        fn as_ref(&self) -> &Step0 {
505            &self.prev_step.prev_step
506        }
507    }
508
509    impl AsRef<Step1> for Step2 {
510        #[inline]
511        fn as_ref(&self) -> &Step1 {
512            &self.prev_step
513        }
514    }
515
516    impl AsRef<Step2> for Step2 {
517        #[inline]
518        fn as_ref(&self) -> &Step2 {
519            self
520        }
521    }
522}
523
524pub struct ConfiguredNetwork {
525    pub keypair: Arc<ed25519::KeyPair>,
526    pub network: Network,
527    pub dht_client: DhtClient,
528    pub peer_resolver: PeerResolver,
529    pub overlay_service: OverlayService,
530}
531
532impl ConfiguredNetwork {
533    pub fn new(
534        public_addr: SocketAddr,
535        secret_key: &ed25519::SecretKey,
536        base_config: &NodeBaseConfig,
537        bootstrap_peers: &[PeerInfo],
538    ) -> Result<Self> {
539        // Setup network
540        let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
541        let local_id = keypair.public_key.into();
542
543        let (dht_tasks, dht_service) = DhtService::builder(local_id)
544            .with_config(base_config.dht.clone())
545            .build();
546
547        let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
548            .with_config(base_config.overlay.clone())
549            .with_dht_service(dht_service.clone())
550            .build();
551
552        let router = Router::builder()
553            .route(dht_service.clone())
554            .route(overlay_service.clone())
555            .build();
556
557        let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
558
559        let network = Network::builder()
560            .with_config(base_config.network.clone())
561            .with_private_key(secret_key.to_bytes())
562            .with_remote_addr(public_addr)
563            .build(local_addr, router)
564            .context("failed to build node network")?;
565
566        let bootstrap_peer_count = dht_tasks.spawn(&network, bootstrap_peers)?;
567        overlay_tasks.spawn(&network);
568
569        let dht_client = dht_service.make_client(&network);
570        let peer_resolver = dht_service
571            .make_peer_resolver()
572            .with_config(base_config.peer_resolver.clone())
573            .build(&network);
574
575        tracing::info!(
576            %local_id,
577            %local_addr,
578            %public_addr,
579            bootstrap_peers = bootstrap_peer_count,
580            "initialized network"
581        );
582
583        Ok(Self {
584            keypair,
585            network,
586            dht_client,
587            peer_resolver,
588            overlay_service,
589        })
590    }
591
592    pub fn add_blockchain_rpc<BL, SL, RP>(
593        &self,
594        zerostate: &ZerostateId,
595        storage: CoreStorage,
596        rpc_provider: RP,
597        remote_broadcast_listener: BL,
598        self_broadcast_listener: SL,
599        base_config: &NodeBaseConfig,
600    ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
601    where
602        BL: BroadcastListener,
603        SL: SelfBroadcastListener,
604        RP: IntoRpcDataProvider,
605    {
606        let blockchain_rpc_service = BlockchainRpcService::builder()
607            .with_config(base_config.blockchain_rpc_service.clone())
608            .with_storage(storage)
609            .with_data_provider(rpc_provider)
610            .with_broadcast_listener(remote_broadcast_listener)
611            .build();
612
613        let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
614            .named("blockchain_rpc")
615            .with_peer_resolver(self.peer_resolver.clone())
616            .build(blockchain_rpc_service.clone());
617        self.overlay_service.add_public_overlay(&public_overlay);
618
619        let blockchain_rpc_client = BlockchainRpcClient::builder()
620            .with_config(base_config.blockchain_rpc_client.clone())
621            .with_public_overlay_client(PublicOverlayClient::new(
622                self.network.clone(),
623                public_overlay,
624                base_config.public_overlay_client.clone(),
625            ))
626            .with_self_broadcast_listener(self_broadcast_listener)
627            .build();
628
629        tracing::info!(
630            overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
631            "initialized blockchain rpc"
632        );
633
634        (blockchain_rpc_service, blockchain_rpc_client)
635    }
636}
637
638pub struct ConfiguredStorage {
639    pub context: StorageContext,
640    pub core_storage: CoreStorage,
641}
642
643impl ConfiguredStorage {
644    pub async fn new(
645        storage_config: &StorageConfig,
646        core_storage_config: &CoreStorageConfig,
647    ) -> Result<Self> {
648        let context = StorageContext::new(storage_config.clone())
649            .await
650            .context("failed to create storage context")?;
651        let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
652            .await
653            .context("failed to create storage")?;
654        tracing::info!(
655            root_dir = %core_storage.context().root_dir().path().display(),
656            "initialized storage"
657        );
658
659        Ok(Self {
660            context,
661            core_storage,
662        })
663    }
664}