tycho_core/node/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8    DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13pub use self::config::NodeBaseConfig;
14pub use self::keys::NodeKeys;
15use crate::block_strider::{
16    ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
17    ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
18    StorageBlockProvider,
19};
20use crate::blockchain_rpc::{
21    BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
22};
23use crate::global_config::{GlobalConfig, ZerostateId};
24use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
25#[cfg(feature = "s3")]
26use crate::s3::S3Client;
27use crate::storage::{CoreStorage, CoreStorageConfig};
28
29mod config;
30mod keys;
31
32pub struct NodeBase {
33    pub base_config: NodeBaseConfig,
34    pub global_config: GlobalConfig,
35
36    pub keypair: Arc<ed25519::KeyPair>,
37    pub network: Network,
38    pub dht_client: DhtClient,
39    pub peer_resolver: PeerResolver,
40    pub overlay_service: OverlayService,
41
42    pub storage_context: StorageContext,
43    pub core_storage: CoreStorage,
44
45    pub blockchain_rpc_client: BlockchainRpcClient,
46
47    #[cfg(feature = "s3")]
48    pub s3_client: Option<S3Client>,
49}
50
51impl NodeBase {
52    pub fn builder<'a>(
53        base_config: &'a NodeBaseConfig,
54        global_config: &'a GlobalConfig,
55    ) -> NodeBaseBuilder<'a, ()> {
56        crate::record_version_metric();
57        NodeBaseBuilder::new(base_config, global_config)
58    }
59
60    /// Wait for some peers and boot the node.
61    pub async fn init(
62        &self,
63        boot_type: ColdBootType,
64        import_zerostate: Option<Vec<PathBuf>>,
65        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
66    ) -> Result<BlockId> {
67        self.wait_for_neighbours(3).await;
68
69        let init_block_id = self
70            .boot(boot_type, import_zerostate, queue_state_handler)
71            .await
72            .context("failed to init node")?;
73
74        tracing::info!(%init_block_id, "node initialized");
75
76        Ok(init_block_id)
77    }
78
79    /// Wait for at least `count` public overlay peers to resolve.
80    pub async fn wait_for_neighbours(&self, count: usize) {
81        // Ensure that there are some neighbours
82        tracing::info!("waiting for initial neighbours");
83        self.blockchain_rpc_client
84            .overlay_client()
85            .neighbours()
86            .wait_for_peers(count)
87            .await;
88        tracing::info!("found initial neighbours");
89    }
90
91    /// Initialize the node and return the init block id.
92    pub async fn boot(
93        &self,
94        boot_type: ColdBootType,
95        zerostates: Option<Vec<PathBuf>>,
96        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
97    ) -> Result<BlockId> {
98        let node_state = self.core_storage.node_state();
99
100        let last_mc_block_id = match node_state.load_last_mc_block_id() {
101            Some(block_id) => block_id,
102            None => {
103                let mut starter = Starter::builder()
104                    .with_storage(self.core_storage.clone())
105                    .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
106                    .with_zerostate_id(self.global_config.zerostate)
107                    .with_config(self.base_config.starter.clone());
108
109                if let Some(handler) = queue_state_handler {
110                    starter = starter.with_queue_state_handler(handler);
111                }
112
113                starter
114                    .build()
115                    .cold_boot(boot_type, zerostates.map(FileZerostateProvider))
116                    .await?
117            }
118        };
119
120        tracing::info!(
121            %last_mc_block_id,
122            "boot finished"
123        );
124
125        Ok(last_mc_block_id)
126    }
127
128    pub fn validator_resolver(&self) -> &ValidatorsResolver {
129        self.blockchain_rpc_client
130            .overlay_client()
131            .validators_resolver()
132    }
133
134    /// Update current validator targets with the specified set.
135    pub fn update_validator_set(&self, vset: &ValidatorSet) {
136        self.validator_resolver().update_validator_set(vset);
137    }
138
139    /// Update current validator targets using the validator set from the provider
140    pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
141        // notify subscriber with an initial validators list
142        let mc_state = self
143            .core_storage
144            .shard_state_storage()
145            .load_state(block_id.seqno, block_id)
146            .await?;
147
148        let config = mc_state.config_params()?;
149        let current_vset = config.get_current_validator_set()?;
150        self.update_validator_set(&current_vset);
151        Ok(())
152    }
153
154    pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
155        ArchiveBlockProvider::new(
156            (
157                self.blockchain_rpc_client.clone(),
158                #[cfg(feature = "s3")]
159                self.s3_client.clone(),
160            ),
161            self.core_storage.clone(),
162            self.base_config.archive_block_provider.clone(),
163        )
164    }
165
166    pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
167        BlockchainBlockProvider::new(
168            self.blockchain_rpc_client.clone(),
169            self.core_storage.clone(),
170            self.base_config.blockchain_block_provider.clone(),
171        )
172    }
173
174    pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
175        StorageBlockProvider::new(self.core_storage.clone())
176    }
177
178    /// Creates a new [`BlockStrider`] using options from the base config.
179    pub fn build_strider<P, S>(
180        &self,
181        provider: P,
182        subscriber: S,
183    ) -> BlockStrider<PersistentBlockStriderState, P, S>
184    where
185        P: BlockProvider,
186        S: BlockSubscriber,
187    {
188        let state = PersistentBlockStriderState::new(
189            self.global_config.zerostate.as_block_id(),
190            self.core_storage.clone(),
191        );
192
193        BlockStrider::builder()
194            .with_state(state)
195            .with_provider(provider)
196            .with_block_subscriber(subscriber)
197            .build()
198    }
199}
200
201pub struct NodeBaseBuilder<'a, Step = ()> {
202    base_config: &'a NodeBaseConfig,
203    global_config: &'a GlobalConfig,
204    step: Step,
205}
206
207impl<'a> NodeBaseBuilder<'a, ()> {
208    pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
209        Self {
210            base_config,
211            global_config,
212            step: (),
213        }
214    }
215
216    pub fn init_network(
217        self,
218        public_addr: SocketAddr,
219        secret_key: &ed25519::SecretKey,
220    ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
221        let net = ConfiguredNetwork::new(
222            public_addr,
223            secret_key,
224            self.base_config,
225            &self.global_config.bootstrap_peers,
226        )?;
227
228        Ok(NodeBaseBuilder {
229            base_config: self.base_config,
230            global_config: self.global_config,
231            step: init::Step0 { net },
232        })
233    }
234}
235
236impl<'a> NodeBaseBuilder<'a, init::Step0> {
237    // TODO: Add some options here if needed.
238    pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
239        let store =
240            ConfiguredStorage::new(&self.base_config.storage, &self.base_config.core_storage)
241                .await?;
242
243        Ok(NodeBaseBuilder {
244            base_config: self.base_config,
245            global_config: self.global_config,
246            step: init::Step1 {
247                prev_step: self.step,
248                store,
249            },
250        })
251    }
252}
253
254impl<'a> NodeBaseBuilder<'a, init::Step1> {
255    pub fn init_blockchain_rpc<RL, SL>(
256        self,
257        remote_broadcast_listener: RL,
258        self_broadcast_listener: SL,
259    ) -> Result<NodeBaseBuilder<'a, init::Step2>>
260    where
261        RL: BroadcastListener,
262        SL: SelfBroadcastListener,
263    {
264        let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
265            &self.global_config.zerostate,
266            self.step.store.core_storage.clone(),
267            remote_broadcast_listener,
268            self_broadcast_listener,
269            self.base_config,
270        );
271
272        Ok(NodeBaseBuilder {
273            base_config: self.base_config,
274            global_config: self.global_config,
275            step: init::Step2 {
276                prev_step: self.step,
277                blockchain_rpc_client,
278            },
279        })
280    }
281}
282
283impl<'a> NodeBaseBuilder<'a, init::Final> {
284    pub fn build(self) -> Result<NodeBase> {
285        let net = self.step.prev_step.prev_step.net;
286        let store = self.step.prev_step.store;
287        let blockchain_rpc_client = self.step.blockchain_rpc_client;
288
289        Ok(NodeBase {
290            base_config: self.base_config.clone(),
291            global_config: self.global_config.clone(),
292            keypair: net.keypair,
293            network: net.network,
294            dht_client: net.dht_client,
295            peer_resolver: net.peer_resolver,
296            overlay_service: net.overlay_service,
297            storage_context: store.context,
298            core_storage: store.core_storage,
299            blockchain_rpc_client,
300            #[cfg(feature = "s3")]
301            s3_client: self
302                .base_config
303                .s3_client
304                .as_ref()
305                .map(S3Client::new)
306                .transpose()
307                .context("failed to create S3 client")?,
308        })
309    }
310}
311
312impl<'a, Step> NodeBaseBuilder<'a, Step> {
313    pub fn base_config(&self) -> &'a NodeBaseConfig {
314        self.base_config
315    }
316
317    pub fn global_config(&self) -> &'a GlobalConfig {
318        self.global_config
319    }
320}
321
322impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
323    pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
324        &self.step.as_ref().net.keypair
325    }
326
327    pub fn network(&self) -> &Network {
328        &self.step.as_ref().net.network
329    }
330
331    pub fn dht_client(&self) -> &DhtClient {
332        &self.step.as_ref().net.dht_client
333    }
334
335    pub fn peer_resolver(&self) -> &PeerResolver {
336        &self.step.as_ref().net.peer_resolver
337    }
338
339    pub fn overlay_service(&self) -> &OverlayService {
340        &self.step.as_ref().net.overlay_service
341    }
342}
343
344impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
345    pub fn storage_context(&self) -> &StorageContext {
346        &self.step.as_ref().store.context
347    }
348
349    pub fn core_storage(&self) -> &CoreStorage {
350        &self.step.as_ref().store.core_storage
351    }
352}
353
354impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
355    pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
356        &self.step.as_ref().blockchain_rpc_client
357    }
358}
359
360pub mod init {
361    use super::*;
362
363    pub type Final = Step2;
364
365    /// Node with network.
366    pub struct Step0 {
367        pub(super) net: ConfiguredNetwork,
368    }
369
370    impl AsRef<Step0> for Step0 {
371        #[inline]
372        fn as_ref(&self) -> &Step0 {
373            self
374        }
375    }
376
377    /// Node with network and storage.
378    pub struct Step1 {
379        pub(super) prev_step: Step0,
380        pub(super) store: ConfiguredStorage,
381    }
382
383    impl AsRef<Step0> for Step1 {
384        #[inline]
385        fn as_ref(&self) -> &Step0 {
386            &self.prev_step
387        }
388    }
389
390    impl AsRef<Step1> for Step1 {
391        #[inline]
392        fn as_ref(&self) -> &Step1 {
393            self
394        }
395    }
396
397    /// Node with network, storage and public overlay.
398    pub struct Step2 {
399        pub(super) prev_step: Step1,
400        pub(super) blockchain_rpc_client: BlockchainRpcClient,
401    }
402
403    impl AsRef<Step0> for Step2 {
404        #[inline]
405        fn as_ref(&self) -> &Step0 {
406            &self.prev_step.prev_step
407        }
408    }
409
410    impl AsRef<Step1> for Step2 {
411        #[inline]
412        fn as_ref(&self) -> &Step1 {
413            &self.prev_step
414        }
415    }
416
417    impl AsRef<Step2> for Step2 {
418        #[inline]
419        fn as_ref(&self) -> &Step2 {
420            self
421        }
422    }
423}
424
425pub struct ConfiguredNetwork {
426    pub keypair: Arc<ed25519::KeyPair>,
427    pub network: Network,
428    pub dht_client: DhtClient,
429    pub peer_resolver: PeerResolver,
430    pub overlay_service: OverlayService,
431}
432
433impl ConfiguredNetwork {
434    pub fn new(
435        public_addr: SocketAddr,
436        secret_key: &ed25519::SecretKey,
437        base_config: &NodeBaseConfig,
438        bootstrap_peers: &[PeerInfo],
439    ) -> Result<Self> {
440        // Setup network
441        let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
442        let local_id = keypair.public_key.into();
443
444        let (dht_tasks, dht_service) = DhtService::builder(local_id)
445            .with_config(base_config.dht.clone())
446            .build();
447
448        let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
449            .with_config(base_config.overlay.clone())
450            .with_dht_service(dht_service.clone())
451            .build();
452
453        let router = Router::builder()
454            .route(dht_service.clone())
455            .route(overlay_service.clone())
456            .build();
457
458        let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
459
460        let network = Network::builder()
461            .with_config(base_config.network.clone())
462            .with_private_key(secret_key.to_bytes())
463            .with_remote_addr(public_addr)
464            .build(local_addr, router)
465            .context("failed to build node network")?;
466
467        dht_tasks.spawn(&network);
468        overlay_tasks.spawn(&network);
469
470        let dht_client = dht_service.make_client(&network);
471        let peer_resolver = dht_service
472            .make_peer_resolver()
473            .with_config(base_config.peer_resolver.clone())
474            .build(&network);
475
476        let mut bootstrap_peer_count = 0usize;
477        for peer in bootstrap_peers {
478            let is_new = dht_client.add_peer(Arc::new(peer.clone()))?;
479            bootstrap_peer_count += is_new as usize;
480        }
481
482        tracing::info!(
483            %local_id,
484            %local_addr,
485            %public_addr,
486            bootstrap_peers = bootstrap_peer_count,
487            "initialized network"
488        );
489
490        Ok(Self {
491            keypair,
492            network,
493            dht_client,
494            peer_resolver,
495            overlay_service,
496        })
497    }
498
499    pub fn add_blockchain_rpc<BL, SL>(
500        &self,
501        zerostate: &ZerostateId,
502        storage: CoreStorage,
503        remote_broadcast_listener: BL,
504        self_broadcast_listener: SL,
505        base_config: &NodeBaseConfig,
506    ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
507    where
508        BL: BroadcastListener,
509        SL: SelfBroadcastListener,
510    {
511        let blockchain_rpc_service = BlockchainRpcService::builder()
512            .with_config(base_config.blockchain_rpc_service.clone())
513            .with_storage(storage)
514            .with_broadcast_listener(remote_broadcast_listener)
515            .build();
516
517        let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
518            .named("blockchain_rpc")
519            .with_peer_resolver(self.peer_resolver.clone())
520            .build(blockchain_rpc_service.clone());
521        self.overlay_service.add_public_overlay(&public_overlay);
522
523        let blockchain_rpc_client = BlockchainRpcClient::builder()
524            .with_config(base_config.blockchain_rpc_client.clone())
525            .with_public_overlay_client(PublicOverlayClient::new(
526                self.network.clone(),
527                public_overlay,
528                base_config.public_overlay_client.clone(),
529            ))
530            .with_self_broadcast_listener(self_broadcast_listener)
531            .build();
532
533        tracing::info!(
534            overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
535            "initialized blockchain rpc"
536        );
537
538        (blockchain_rpc_service, blockchain_rpc_client)
539    }
540}
541
542pub struct ConfiguredStorage {
543    pub context: StorageContext,
544    pub core_storage: CoreStorage,
545}
546
547impl ConfiguredStorage {
548    pub async fn new(
549        storage_config: &StorageConfig,
550        core_storage_config: &CoreStorageConfig,
551    ) -> Result<Self> {
552        let context = StorageContext::new(storage_config.clone())
553            .await
554            .context("failed to create storage context")?;
555        let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
556            .await
557            .context("failed to create storage")?;
558        tracing::info!(
559            root_dir = %core_storage.context().root_dir().path().display(),
560            "initialized storage"
561        );
562
563        Ok(Self {
564            context,
565            core_storage,
566        })
567    }
568}