tycho_core/node/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8    DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13#[cfg(feature = "cli")]
14pub use self::cli::{CmdRunArgs, CmdRunOnlyArgs, CmdRunStatus, LightNodeConfig, LightNodeContext};
15pub use self::config::NodeBaseConfig;
16pub use self::keys::NodeKeys;
17use crate::block_strider::{
18    ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
19    ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
20    StorageBlockProvider,
21};
22use crate::blockchain_rpc::{
23    BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
24};
25use crate::global_config::{GlobalConfig, ZerostateId};
26use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
27#[cfg(feature = "s3")]
28use crate::s3::S3Client;
29use crate::storage::{CoreStorage, CoreStorageConfig};
30
31#[cfg(feature = "cli")]
32mod cli;
33mod config;
34mod keys;
35
36pub struct NodeBootArgs {
37    /// Default: [`ColdBootType::LatestPersistent`].
38    pub boot_type: ColdBootType,
39    /// Default: None
40    pub zerostates: Option<Vec<PathBuf>>,
41    /// Default: None
42    pub queue_state_handler: Option<Box<dyn QueueStateHandler>>,
43    /// Default: false
44    pub ignore_states: bool,
45}
46
47impl Default for NodeBootArgs {
48    #[inline]
49    fn default() -> Self {
50        Self {
51            boot_type: ColdBootType::LatestPersistent,
52            zerostates: None,
53            queue_state_handler: None,
54            ignore_states: false,
55        }
56    }
57}
58
59pub struct NodeBase {
60    pub base_config: NodeBaseConfig,
61    pub global_config: GlobalConfig,
62    pub initial_peer_count: usize,
63
64    pub keypair: Arc<ed25519::KeyPair>,
65    pub network: Network,
66    pub dht_client: DhtClient,
67    pub peer_resolver: PeerResolver,
68    pub overlay_service: OverlayService,
69
70    pub storage_context: StorageContext,
71    pub core_storage: CoreStorage,
72
73    pub blockchain_rpc_client: BlockchainRpcClient,
74
75    #[cfg(feature = "s3")]
76    pub s3_client: Option<S3Client>,
77}
78
79impl NodeBase {
80    const DEFAULT_INITIAL_PEER_COUNT: usize = 3;
81
82    pub fn builder<'a>(
83        base_config: &'a NodeBaseConfig,
84        global_config: &'a GlobalConfig,
85    ) -> NodeBaseBuilder<'a, ()> {
86        crate::record_version_metric();
87        NodeBaseBuilder::new(base_config, global_config)
88    }
89
90    /// Wait for some peers and boot the node.
91    pub async fn init(
92        &self,
93        boot_type: ColdBootType,
94        import_zerostate: Option<Vec<PathBuf>>,
95        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
96    ) -> Result<BlockId> {
97        self.init_ext(NodeBootArgs {
98            boot_type,
99            zerostates: import_zerostate,
100            queue_state_handler,
101            ..Default::default()
102        })
103        .await
104    }
105
106    /// Wait for some peers and boot the node.
107    pub async fn init_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
108        self.wait_for_neighbours(self.initial_peer_count).await;
109
110        let init_block_id = self.boot_ext(args).await.context("failed to init node")?;
111        tracing::info!(%init_block_id, "node initialized");
112
113        Ok(init_block_id)
114    }
115
116    /// Wait for at least `count` public overlay peers to resolve.
117    pub async fn wait_for_neighbours(&self, count: usize) {
118        // Ensure that there are some neighbours
119        tracing::info!("waiting for initial neighbours");
120        self.blockchain_rpc_client
121            .overlay_client()
122            .neighbours()
123            .wait_for_peers(count)
124            .await;
125        tracing::info!("found initial neighbours");
126    }
127
128    /// Initialize the node and return the init block id.
129    pub async fn boot(
130        &self,
131        boot_type: ColdBootType,
132        zerostates: Option<Vec<PathBuf>>,
133        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
134    ) -> Result<BlockId> {
135        self.boot_ext(NodeBootArgs {
136            boot_type,
137            zerostates,
138            queue_state_handler,
139            ..Default::default()
140        })
141        .await
142    }
143
144    /// Initialize the node and return the init block id.
145    pub async fn boot_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
146        let node_state = self.core_storage.node_state();
147
148        let last_mc_block_id = match node_state.load_last_mc_block_id() {
149            Some(block_id) => block_id,
150            None => {
151                let mut starter = Starter::builder()
152                    .with_storage(self.core_storage.clone())
153                    .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
154                    .with_zerostate_id(self.global_config.zerostate)
155                    .with_config(self.base_config.starter.clone())
156                    .ignore_states(args.ignore_states);
157
158                if let Some(handler) = args.queue_state_handler {
159                    starter = starter.with_queue_state_handler(handler);
160                }
161
162                #[cfg(feature = "s3")]
163                if let Some(s3_client) = self.s3_client.as_ref() {
164                    starter = starter.with_s3_client(s3_client.clone());
165                }
166
167                starter
168                    .build()
169                    .cold_boot(args.boot_type, args.zerostates.map(FileZerostateProvider))
170                    .await?
171            }
172        };
173
174        tracing::info!(
175            %last_mc_block_id,
176            "boot finished"
177        );
178
179        Ok(last_mc_block_id)
180    }
181
182    pub fn validator_resolver(&self) -> &ValidatorsResolver {
183        self.blockchain_rpc_client
184            .overlay_client()
185            .validators_resolver()
186    }
187
188    /// Update current validator targets with the specified set.
189    pub fn update_validator_set(&self, vset: &ValidatorSet) {
190        self.validator_resolver().update_validator_set(vset);
191    }
192
193    /// Update current validator targets using the validator set from the provider
194    pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
195        // notify subscriber with an initial validators list
196        let mc_state = self
197            .core_storage
198            .shard_state_storage()
199            .load_state(block_id.seqno, block_id)
200            .await?;
201
202        let config = mc_state.config_params()?;
203        let current_vset = config.get_current_validator_set()?;
204        self.update_validator_set(&current_vset);
205        Ok(())
206    }
207
208    pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
209        ArchiveBlockProvider::new(
210            (
211                self.blockchain_rpc_client.clone(),
212                #[cfg(feature = "s3")]
213                self.s3_client.clone(),
214            ),
215            self.core_storage.clone(),
216            self.base_config.archive_block_provider.clone(),
217        )
218    }
219
220    pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
221        BlockchainBlockProvider::new(
222            self.blockchain_rpc_client.clone(),
223            self.core_storage.clone(),
224            self.base_config.blockchain_block_provider.clone(),
225        )
226    }
227
228    pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
229        StorageBlockProvider::new(self.core_storage.clone())
230    }
231
232    /// Creates a new [`BlockStrider`] using options from the base config.
233    pub fn build_strider<P, S>(
234        &self,
235        provider: P,
236        subscriber: S,
237    ) -> BlockStrider<PersistentBlockStriderState, P, S>
238    where
239        P: BlockProvider,
240        S: BlockSubscriber,
241    {
242        let state = PersistentBlockStriderState::new(
243            self.global_config.zerostate.as_block_id(),
244            self.core_storage.clone(),
245        );
246
247        BlockStrider::builder()
248            .with_state(state)
249            .with_provider(provider)
250            .with_block_subscriber(subscriber)
251            .build()
252    }
253}
254
255pub struct NodeBaseBuilder<'a, Step = ()> {
256    common: NodeBaseBuilderCommon<'a>,
257    step: Step,
258}
259
260impl<'a> NodeBaseBuilder<'a, ()> {
261    pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
262        Self {
263            common: NodeBaseBuilderCommon {
264                base_config,
265                global_config,
266                initial_peer_count: NodeBase::DEFAULT_INITIAL_PEER_COUNT,
267            },
268            step: (),
269        }
270    }
271
272    pub fn init_network(
273        self,
274        public_addr: SocketAddr,
275        secret_key: &ed25519::SecretKey,
276    ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
277        let net = ConfiguredNetwork::new(
278            public_addr,
279            secret_key,
280            self.common.base_config,
281            &self.common.global_config.bootstrap_peers,
282        )?;
283
284        Ok(NodeBaseBuilder {
285            common: self.common,
286            step: init::Step0 { net },
287        })
288    }
289}
290
291impl<'a> NodeBaseBuilder<'a, init::Step0> {
292    // TODO: Add some options here if needed.
293    pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
294        let store = ConfiguredStorage::new(
295            &self.common.base_config.storage,
296            &self.common.base_config.core_storage,
297        )
298        .await?;
299
300        Ok(NodeBaseBuilder {
301            common: self.common,
302            step: init::Step1 {
303                prev_step: self.step,
304                store,
305            },
306        })
307    }
308}
309
310impl<'a> NodeBaseBuilder<'a, init::Step1> {
311    pub fn init_blockchain_rpc<RL, SL>(
312        self,
313        remote_broadcast_listener: RL,
314        self_broadcast_listener: SL,
315    ) -> Result<NodeBaseBuilder<'a, init::Step2>>
316    where
317        RL: BroadcastListener,
318        SL: SelfBroadcastListener,
319    {
320        let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
321            &self.common.global_config.zerostate,
322            self.step.store.core_storage.clone(),
323            remote_broadcast_listener,
324            self_broadcast_listener,
325            self.common.base_config,
326        );
327
328        Ok(NodeBaseBuilder {
329            common: self.common,
330            step: init::Step2 {
331                prev_step: self.step,
332                blockchain_rpc_client,
333            },
334        })
335    }
336}
337
338impl<'a> NodeBaseBuilder<'a, init::Final> {
339    pub fn build(self) -> Result<NodeBase> {
340        let net = self.step.prev_step.prev_step.net;
341        let store = self.step.prev_step.store;
342        let blockchain_rpc_client = self.step.blockchain_rpc_client;
343
344        Ok(NodeBase {
345            base_config: self.common.base_config.clone(),
346            global_config: self.common.global_config.clone(),
347            initial_peer_count: self.common.initial_peer_count,
348            keypair: net.keypair,
349            network: net.network,
350            dht_client: net.dht_client,
351            peer_resolver: net.peer_resolver,
352            overlay_service: net.overlay_service,
353            storage_context: store.context,
354            core_storage: store.core_storage,
355            blockchain_rpc_client,
356            #[cfg(feature = "s3")]
357            s3_client: self
358                .common
359                .base_config
360                .s3_client
361                .as_ref()
362                .map(S3Client::new)
363                .transpose()
364                .context("failed to create S3 client")?,
365        })
366    }
367}
368
369impl<'a, Step> NodeBaseBuilder<'a, Step> {
370    pub fn base_config(&self) -> &'a NodeBaseConfig {
371        self.common.base_config
372    }
373
374    pub fn global_config(&self) -> &'a GlobalConfig {
375        self.common.global_config
376    }
377
378    pub fn initial_peer_count(&self) -> usize {
379        self.common.initial_peer_count
380    }
381
382    pub fn with_initial_peer_count(mut self, count: usize) -> Self {
383        self.common.initial_peer_count = count;
384        self
385    }
386}
387
388impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
389    pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
390        &self.step.as_ref().net.keypair
391    }
392
393    pub fn network(&self) -> &Network {
394        &self.step.as_ref().net.network
395    }
396
397    pub fn dht_client(&self) -> &DhtClient {
398        &self.step.as_ref().net.dht_client
399    }
400
401    pub fn peer_resolver(&self) -> &PeerResolver {
402        &self.step.as_ref().net.peer_resolver
403    }
404
405    pub fn overlay_service(&self) -> &OverlayService {
406        &self.step.as_ref().net.overlay_service
407    }
408}
409
410impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
411    pub fn storage_context(&self) -> &StorageContext {
412        &self.step.as_ref().store.context
413    }
414
415    pub fn core_storage(&self) -> &CoreStorage {
416        &self.step.as_ref().store.core_storage
417    }
418}
419
420impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
421    pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
422        &self.step.as_ref().blockchain_rpc_client
423    }
424}
425
426struct NodeBaseBuilderCommon<'a> {
427    base_config: &'a NodeBaseConfig,
428    global_config: &'a GlobalConfig,
429    initial_peer_count: usize,
430}
431
432pub mod init {
433    use super::*;
434
435    pub type Final = Step2;
436
437    /// Node with network.
438    pub struct Step0 {
439        pub(super) net: ConfiguredNetwork,
440    }
441
442    impl AsRef<Step0> for Step0 {
443        #[inline]
444        fn as_ref(&self) -> &Step0 {
445            self
446        }
447    }
448
449    /// Node with network and storage.
450    pub struct Step1 {
451        pub(super) prev_step: Step0,
452        pub(super) store: ConfiguredStorage,
453    }
454
455    impl AsRef<Step0> for Step1 {
456        #[inline]
457        fn as_ref(&self) -> &Step0 {
458            &self.prev_step
459        }
460    }
461
462    impl AsRef<Step1> for Step1 {
463        #[inline]
464        fn as_ref(&self) -> &Step1 {
465            self
466        }
467    }
468
469    /// Node with network, storage and public overlay.
470    pub struct Step2 {
471        pub(super) prev_step: Step1,
472        pub(super) blockchain_rpc_client: BlockchainRpcClient,
473    }
474
475    impl AsRef<Step0> for Step2 {
476        #[inline]
477        fn as_ref(&self) -> &Step0 {
478            &self.prev_step.prev_step
479        }
480    }
481
482    impl AsRef<Step1> for Step2 {
483        #[inline]
484        fn as_ref(&self) -> &Step1 {
485            &self.prev_step
486        }
487    }
488
489    impl AsRef<Step2> for Step2 {
490        #[inline]
491        fn as_ref(&self) -> &Step2 {
492            self
493        }
494    }
495}
496
497pub struct ConfiguredNetwork {
498    pub keypair: Arc<ed25519::KeyPair>,
499    pub network: Network,
500    pub dht_client: DhtClient,
501    pub peer_resolver: PeerResolver,
502    pub overlay_service: OverlayService,
503}
504
505impl ConfiguredNetwork {
506    pub fn new(
507        public_addr: SocketAddr,
508        secret_key: &ed25519::SecretKey,
509        base_config: &NodeBaseConfig,
510        bootstrap_peers: &[PeerInfo],
511    ) -> Result<Self> {
512        // Setup network
513        let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
514        let local_id = keypair.public_key.into();
515
516        let (dht_tasks, dht_service) = DhtService::builder(local_id)
517            .with_config(base_config.dht.clone())
518            .build();
519
520        let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
521            .with_config(base_config.overlay.clone())
522            .with_dht_service(dht_service.clone())
523            .build();
524
525        let router = Router::builder()
526            .route(dht_service.clone())
527            .route(overlay_service.clone())
528            .build();
529
530        let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
531
532        let network = Network::builder()
533            .with_config(base_config.network.clone())
534            .with_private_key(secret_key.to_bytes())
535            .with_remote_addr(public_addr)
536            .build(local_addr, router)
537            .context("failed to build node network")?;
538
539        let bootstrap_peer_count = dht_tasks.spawn(&network, bootstrap_peers)?;
540        overlay_tasks.spawn(&network);
541
542        let dht_client = dht_service.make_client(&network);
543        let peer_resolver = dht_service
544            .make_peer_resolver()
545            .with_config(base_config.peer_resolver.clone())
546            .build(&network);
547
548        tracing::info!(
549            %local_id,
550            %local_addr,
551            %public_addr,
552            bootstrap_peers = bootstrap_peer_count,
553            "initialized network"
554        );
555
556        Ok(Self {
557            keypair,
558            network,
559            dht_client,
560            peer_resolver,
561            overlay_service,
562        })
563    }
564
565    pub fn add_blockchain_rpc<BL, SL>(
566        &self,
567        zerostate: &ZerostateId,
568        storage: CoreStorage,
569        remote_broadcast_listener: BL,
570        self_broadcast_listener: SL,
571        base_config: &NodeBaseConfig,
572    ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
573    where
574        BL: BroadcastListener,
575        SL: SelfBroadcastListener,
576    {
577        let blockchain_rpc_service = BlockchainRpcService::builder()
578            .with_config(base_config.blockchain_rpc_service.clone())
579            .with_storage(storage)
580            .with_broadcast_listener(remote_broadcast_listener)
581            .build();
582
583        let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
584            .named("blockchain_rpc")
585            .with_peer_resolver(self.peer_resolver.clone())
586            .build(blockchain_rpc_service.clone());
587        self.overlay_service.add_public_overlay(&public_overlay);
588
589        let blockchain_rpc_client = BlockchainRpcClient::builder()
590            .with_config(base_config.blockchain_rpc_client.clone())
591            .with_public_overlay_client(PublicOverlayClient::new(
592                self.network.clone(),
593                public_overlay,
594                base_config.public_overlay_client.clone(),
595            ))
596            .with_self_broadcast_listener(self_broadcast_listener)
597            .build();
598
599        tracing::info!(
600            overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
601            "initialized blockchain rpc"
602        );
603
604        (blockchain_rpc_service, blockchain_rpc_client)
605    }
606}
607
608pub struct ConfiguredStorage {
609    pub context: StorageContext,
610    pub core_storage: CoreStorage,
611}
612
613impl ConfiguredStorage {
614    pub async fn new(
615        storage_config: &StorageConfig,
616        core_storage_config: &CoreStorageConfig,
617    ) -> Result<Self> {
618        let context = StorageContext::new(storage_config.clone())
619            .await
620            .context("failed to create storage context")?;
621        let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
622            .await
623            .context("failed to create storage")?;
624        tracing::info!(
625            root_dir = %core_storage.context().root_dir().path().display(),
626            "initialized storage"
627        );
628
629        Ok(Self {
630            context,
631            core_storage,
632        })
633    }
634}