tycho_core/node/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8    DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13pub use self::config::NodeBaseConfig;
14pub use self::keys::NodeKeys;
15use crate::block_strider::{
16    ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
17    ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
18    StorageBlockProvider,
19};
20use crate::blockchain_rpc::{
21    BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
22};
23use crate::global_config::{GlobalConfig, ZerostateId};
24use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
25use crate::storage::{CoreStorage, CoreStorageConfig};
26
27mod config;
28mod keys;
29
30pub struct NodeBase {
31    pub base_config: NodeBaseConfig,
32    pub global_config: GlobalConfig,
33
34    pub keypair: Arc<ed25519::KeyPair>,
35    pub network: Network,
36    pub dht_client: DhtClient,
37    pub peer_resolver: PeerResolver,
38    pub overlay_service: OverlayService,
39
40    pub storage_context: StorageContext,
41    pub core_storage: CoreStorage,
42
43    pub blockchain_rpc_client: BlockchainRpcClient,
44}
45
46impl NodeBase {
47    pub fn builder<'a>(
48        base_config: &'a NodeBaseConfig,
49        global_config: &'a GlobalConfig,
50    ) -> NodeBaseBuilder<'a, ()> {
51        NodeBaseBuilder::new(base_config, global_config)
52    }
53
54    /// Wait for some peers and boot the node.
55    pub async fn init(
56        &self,
57        boot_type: ColdBootType,
58        import_zerostate: Option<Vec<PathBuf>>,
59        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
60    ) -> Result<BlockId> {
61        self.wait_for_neighbours(3).await;
62
63        let init_block_id = self
64            .boot(boot_type, import_zerostate, queue_state_handler)
65            .await
66            .context("failed to init node")?;
67
68        tracing::info!(%init_block_id, "node initialized");
69
70        Ok(init_block_id)
71    }
72
73    /// Wait for at least `count` public overlay peers to resolve.
74    pub async fn wait_for_neighbours(&self, count: usize) {
75        // Ensure that there are some neighbours
76        tracing::info!("waiting for initial neighbours");
77        self.blockchain_rpc_client
78            .overlay_client()
79            .neighbours()
80            .wait_for_peers(count)
81            .await;
82        tracing::info!("found initial neighbours");
83    }
84
85    /// Initialize the node and return the init block id.
86    pub async fn boot(
87        &self,
88        boot_type: ColdBootType,
89        zerostates: Option<Vec<PathBuf>>,
90        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
91    ) -> Result<BlockId> {
92        let node_state = self.core_storage.node_state();
93
94        let last_mc_block_id = match node_state.load_last_mc_block_id() {
95            Some(block_id) => block_id,
96            None => {
97                let mut starter = Starter::builder()
98                    .with_storage(self.core_storage.clone())
99                    .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
100                    .with_zerostate_id(self.global_config.zerostate)
101                    .with_config(self.base_config.starter.clone());
102
103                if let Some(handler) = queue_state_handler {
104                    starter = starter.with_queue_state_handler(handler);
105                }
106
107                starter
108                    .build()
109                    .cold_boot(boot_type, zerostates.map(FileZerostateProvider))
110                    .await?
111            }
112        };
113
114        tracing::info!(
115            %last_mc_block_id,
116            "boot finished"
117        );
118
119        Ok(last_mc_block_id)
120    }
121
122    pub fn validator_resolver(&self) -> &ValidatorsResolver {
123        self.blockchain_rpc_client
124            .overlay_client()
125            .validators_resolver()
126    }
127
128    /// Update current validator targets with the specified set.
129    pub fn update_validator_set(&self, vset: &ValidatorSet) {
130        self.validator_resolver().update_validator_set(vset);
131    }
132
133    /// Update current validator targets using the validator set from the provider
134    pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
135        // notify subscriber with an initial validators list
136        let mc_state = self
137            .core_storage
138            .shard_state_storage()
139            .load_state(block_id)
140            .await?;
141
142        let config = mc_state.config_params()?;
143        let current_vset = config.get_current_validator_set()?;
144        self.update_validator_set(&current_vset);
145        Ok(())
146    }
147
148    pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
149        ArchiveBlockProvider::new(
150            self.blockchain_rpc_client.clone(),
151            self.core_storage.clone(),
152            self.base_config.archive_block_provider.clone(),
153        )
154    }
155
156    pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
157        BlockchainBlockProvider::new(
158            self.blockchain_rpc_client.clone(),
159            self.core_storage.clone(),
160            self.base_config.blockchain_block_provider.clone(),
161        )
162    }
163
164    pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
165        StorageBlockProvider::new(self.core_storage.clone())
166    }
167
168    /// Creates a new [`BlockStrider`] using options from the base config.
169    pub fn build_strider<P, S>(
170        &self,
171        provider: P,
172        subscriber: S,
173    ) -> BlockStrider<PersistentBlockStriderState, P, S>
174    where
175        P: BlockProvider,
176        S: BlockSubscriber,
177    {
178        let state = PersistentBlockStriderState::new(
179            self.global_config.zerostate.as_block_id(),
180            self.core_storage.clone(),
181        );
182
183        BlockStrider::builder()
184            .with_state(state)
185            .with_provider(provider)
186            .with_block_subscriber(subscriber)
187            .build()
188    }
189}
190
191pub struct NodeBaseBuilder<'a, Step = ()> {
192    base_config: &'a NodeBaseConfig,
193    global_config: &'a GlobalConfig,
194    step: Step,
195}
196
197impl<'a> NodeBaseBuilder<'a, ()> {
198    pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
199        Self {
200            base_config,
201            global_config,
202            step: (),
203        }
204    }
205
206    pub fn init_network(
207        self,
208        public_addr: SocketAddr,
209        secret_key: &ed25519::SecretKey,
210    ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
211        let net = ConfiguredNetwork::new(
212            public_addr,
213            secret_key,
214            self.base_config,
215            &self.global_config.bootstrap_peers,
216        )?;
217
218        Ok(NodeBaseBuilder {
219            base_config: self.base_config,
220            global_config: self.global_config,
221            step: init::Step0 { net },
222        })
223    }
224}
225
226impl<'a> NodeBaseBuilder<'a, init::Step0> {
227    // TODO: Add some options here if needed.
228    pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
229        let store =
230            ConfiguredStorage::new(&self.base_config.storage, &self.base_config.core_storage)
231                .await?;
232
233        Ok(NodeBaseBuilder {
234            base_config: self.base_config,
235            global_config: self.global_config,
236            step: init::Step1 {
237                prev_step: self.step,
238                store,
239            },
240        })
241    }
242}
243
244impl<'a> NodeBaseBuilder<'a, init::Step1> {
245    pub fn init_blockchain_rpc<RL, SL>(
246        self,
247        remote_broadcast_listener: RL,
248        self_broadcast_listener: SL,
249    ) -> Result<NodeBaseBuilder<'a, init::Step2>>
250    where
251        RL: BroadcastListener,
252        SL: SelfBroadcastListener,
253    {
254        let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
255            &self.global_config.zerostate,
256            self.step.store.core_storage.clone(),
257            remote_broadcast_listener,
258            self_broadcast_listener,
259            self.base_config,
260        );
261
262        Ok(NodeBaseBuilder {
263            base_config: self.base_config,
264            global_config: self.global_config,
265            step: init::Step2 {
266                prev_step: self.step,
267                blockchain_rpc_client,
268            },
269        })
270    }
271}
272
273impl NodeBaseBuilder<'_, init::Final> {
274    pub fn build(self) -> Result<NodeBase> {
275        let net = self.step.prev_step.prev_step.net;
276        let store = self.step.prev_step.store;
277        let blockchain_rpc_client = self.step.blockchain_rpc_client;
278
279        Ok(NodeBase {
280            base_config: self.base_config.clone(),
281            global_config: self.global_config.clone(),
282            keypair: net.keypair,
283            network: net.network,
284            dht_client: net.dht_client,
285            peer_resolver: net.peer_resolver,
286            overlay_service: net.overlay_service,
287            storage_context: store.context,
288            core_storage: store.core_storage,
289            blockchain_rpc_client,
290        })
291    }
292}
293
294impl<'a, Step> NodeBaseBuilder<'a, Step> {
295    pub fn base_config(&self) -> &'a NodeBaseConfig {
296        self.base_config
297    }
298
299    pub fn global_config(&self) -> &'a GlobalConfig {
300        self.global_config
301    }
302}
303
304impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
305    pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
306        &self.step.as_ref().net.keypair
307    }
308
309    pub fn network(&self) -> &Network {
310        &self.step.as_ref().net.network
311    }
312
313    pub fn dht_client(&self) -> &DhtClient {
314        &self.step.as_ref().net.dht_client
315    }
316
317    pub fn peer_resolver(&self) -> &PeerResolver {
318        &self.step.as_ref().net.peer_resolver
319    }
320
321    pub fn overlay_service(&self) -> &OverlayService {
322        &self.step.as_ref().net.overlay_service
323    }
324}
325
326impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
327    pub fn storage_context(&self) -> &StorageContext {
328        &self.step.as_ref().store.context
329    }
330
331    pub fn core_storage(&self) -> &CoreStorage {
332        &self.step.as_ref().store.core_storage
333    }
334}
335
336impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
337    pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
338        &self.step.as_ref().blockchain_rpc_client
339    }
340}
341
342pub mod init {
343    use super::*;
344
345    pub type Final = Step2;
346
347    /// Node with network.
348    pub struct Step0 {
349        pub(super) net: ConfiguredNetwork,
350    }
351
352    impl AsRef<Step0> for Step0 {
353        #[inline]
354        fn as_ref(&self) -> &Step0 {
355            self
356        }
357    }
358
359    /// Node with network and storage.
360    pub struct Step1 {
361        pub(super) prev_step: Step0,
362        pub(super) store: ConfiguredStorage,
363    }
364
365    impl AsRef<Step0> for Step1 {
366        #[inline]
367        fn as_ref(&self) -> &Step0 {
368            &self.prev_step
369        }
370    }
371
372    impl AsRef<Step1> for Step1 {
373        #[inline]
374        fn as_ref(&self) -> &Step1 {
375            self
376        }
377    }
378
379    /// Node with network, storage and public overlay.
380    pub struct Step2 {
381        pub(super) prev_step: Step1,
382        pub(super) blockchain_rpc_client: BlockchainRpcClient,
383    }
384
385    impl AsRef<Step0> for Step2 {
386        #[inline]
387        fn as_ref(&self) -> &Step0 {
388            &self.prev_step.prev_step
389        }
390    }
391
392    impl AsRef<Step1> for Step2 {
393        #[inline]
394        fn as_ref(&self) -> &Step1 {
395            &self.prev_step
396        }
397    }
398
399    impl AsRef<Step2> for Step2 {
400        #[inline]
401        fn as_ref(&self) -> &Step2 {
402            self
403        }
404    }
405}
406
407pub struct ConfiguredNetwork {
408    pub keypair: Arc<ed25519::KeyPair>,
409    pub network: Network,
410    pub dht_client: DhtClient,
411    pub peer_resolver: PeerResolver,
412    pub overlay_service: OverlayService,
413}
414
415impl ConfiguredNetwork {
416    pub fn new(
417        public_addr: SocketAddr,
418        secret_key: &ed25519::SecretKey,
419        base_config: &NodeBaseConfig,
420        bootstrap_peers: &[PeerInfo],
421    ) -> Result<Self> {
422        // Setup network
423        let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
424        let local_id = keypair.public_key.into();
425
426        let (dht_tasks, dht_service) = DhtService::builder(local_id)
427            .with_config(base_config.dht.clone())
428            .build();
429
430        let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
431            .with_config(base_config.overlay.clone())
432            .with_dht_service(dht_service.clone())
433            .build();
434
435        let router = Router::builder()
436            .route(dht_service.clone())
437            .route(overlay_service.clone())
438            .build();
439
440        let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
441
442        let network = Network::builder()
443            .with_config(base_config.network.clone())
444            .with_private_key(secret_key.to_bytes())
445            .with_remote_addr(public_addr)
446            .build(local_addr, router)
447            .context("failed to build node network")?;
448
449        dht_tasks.spawn(&network);
450        overlay_tasks.spawn(&network);
451
452        let dht_client = dht_service.make_client(&network);
453        let peer_resolver = dht_service
454            .make_peer_resolver()
455            .with_config(base_config.peer_resolver.clone())
456            .build(&network);
457
458        let mut bootstrap_peer_count = 0usize;
459        for peer in bootstrap_peers {
460            let is_new = dht_client.add_peer(Arc::new(peer.clone()))?;
461            bootstrap_peer_count += is_new as usize;
462        }
463
464        tracing::info!(
465            %local_id,
466            %local_addr,
467            %public_addr,
468            bootstrap_peers = bootstrap_peer_count,
469            "initialized network"
470        );
471
472        Ok(Self {
473            keypair,
474            network,
475            dht_client,
476            peer_resolver,
477            overlay_service,
478        })
479    }
480
481    pub fn add_blockchain_rpc<BL, SL>(
482        &self,
483        zerostate: &ZerostateId,
484        storage: CoreStorage,
485        remote_broadcast_listener: BL,
486        self_broadcast_listener: SL,
487        base_config: &NodeBaseConfig,
488    ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
489    where
490        BL: BroadcastListener,
491        SL: SelfBroadcastListener,
492    {
493        let blockchain_rpc_service = BlockchainRpcService::builder()
494            .with_config(base_config.blockchain_rpc_service.clone())
495            .with_storage(storage)
496            .with_broadcast_listener(remote_broadcast_listener)
497            .build();
498
499        let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
500            .named("blockchain_rpc")
501            .with_peer_resolver(self.peer_resolver.clone())
502            .build(blockchain_rpc_service.clone());
503        self.overlay_service.add_public_overlay(&public_overlay);
504
505        let blockchain_rpc_client = BlockchainRpcClient::builder()
506            .with_config(base_config.blockchain_rpc_client.clone())
507            .with_public_overlay_client(PublicOverlayClient::new(
508                self.network.clone(),
509                public_overlay,
510                base_config.public_overlay_client.clone(),
511            ))
512            .with_self_broadcast_listener(self_broadcast_listener)
513            .build();
514
515        tracing::info!(
516            overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
517            "initialized blockchain rpc"
518        );
519
520        (blockchain_rpc_service, blockchain_rpc_client)
521    }
522}
523
524pub struct ConfiguredStorage {
525    pub context: StorageContext,
526    pub core_storage: CoreStorage,
527}
528
529impl ConfiguredStorage {
530    pub async fn new(
531        storage_config: &StorageConfig,
532        core_storage_config: &CoreStorageConfig,
533    ) -> Result<Self> {
534        let context = StorageContext::new(storage_config.clone())
535            .await
536            .context("failed to create storage context")?;
537        let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
538            .await
539            .context("failed to create storage")?;
540        tracing::info!(
541            root_dir = %core_storage.context().root_dir().path().display(),
542            "initialized storage"
543        );
544
545        Ok(Self {
546            context,
547            core_storage,
548        })
549    }
550}