tycho_core/node/
mod.rs

1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8    DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13pub use self::config::NodeBaseConfig;
14pub use self::keys::NodeKeys;
15use crate::block_strider::{
16    ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
17    ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
18    StorageBlockProvider,
19};
20use crate::blockchain_rpc::{
21    BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
22};
23use crate::global_config::{GlobalConfig, ZerostateId};
24use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
25use crate::storage::{CoreStorage, CoreStorageConfig};
26
27mod config;
28mod keys;
29
30pub struct NodeBase {
31    pub base_config: NodeBaseConfig,
32    pub global_config: GlobalConfig,
33
34    pub keypair: Arc<ed25519::KeyPair>,
35    pub network: Network,
36    pub dht_client: DhtClient,
37    pub peer_resolver: PeerResolver,
38    pub overlay_service: OverlayService,
39
40    pub storage_context: StorageContext,
41    pub core_storage: CoreStorage,
42
43    pub blockchain_rpc_client: BlockchainRpcClient,
44}
45
46impl NodeBase {
47    pub fn builder<'a>(
48        base_config: &'a NodeBaseConfig,
49        global_config: &'a GlobalConfig,
50    ) -> NodeBaseBuilder<'a, ()> {
51        NodeBaseBuilder::new(base_config, global_config)
52    }
53
54    /// Wait for some peers and boot the node.
55    pub async fn init(
56        &self,
57        boot_type: ColdBootType,
58        import_zerostate: Option<Vec<PathBuf>>,
59        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
60    ) -> Result<BlockId> {
61        self.wait_for_neighbours(3).await;
62
63        let init_block_id = self
64            .boot(boot_type, import_zerostate, queue_state_handler)
65            .await
66            .context("failed to init node")?;
67
68        tracing::info!(%init_block_id, "node initialized");
69
70        Ok(init_block_id)
71    }
72
73    /// Wait for at least `count` public overlay peers to resolve.
74    pub async fn wait_for_neighbours(&self, count: usize) {
75        // Ensure that there are some neighbours
76        tracing::info!("waiting for initial neighbours");
77        self.blockchain_rpc_client
78            .overlay_client()
79            .neighbours()
80            .wait_for_peers(count)
81            .await;
82        tracing::info!("found initial neighbours");
83    }
84
85    /// Initialize the node and return the init block id.
86    pub async fn boot(
87        &self,
88        boot_type: ColdBootType,
89        zerostates: Option<Vec<PathBuf>>,
90        queue_state_handler: Option<Box<dyn QueueStateHandler>>,
91    ) -> Result<BlockId> {
92        let node_state = self.core_storage.node_state();
93
94        let last_mc_block_id = match node_state.load_last_mc_block_id() {
95            Some(block_id) => block_id,
96            None => {
97                let mut starter = Starter::builder()
98                    .with_storage(self.core_storage.clone())
99                    .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
100                    .with_zerostate_id(self.global_config.zerostate)
101                    .with_config(self.base_config.starter.clone());
102
103                if let Some(handler) = queue_state_handler {
104                    starter = starter.with_queue_state_handler(handler);
105                }
106
107                starter
108                    .build()
109                    .cold_boot(boot_type, zerostates.map(FileZerostateProvider))
110                    .await?
111            }
112        };
113
114        let shard_state = self.core_storage.shard_state_storage();
115        Starter::init_allowed_workchains(shard_state, &last_mc_block_id).await?;
116
117        tracing::info!(
118            %last_mc_block_id,
119            "boot finished"
120        );
121
122        Ok(last_mc_block_id)
123    }
124
125    pub fn validator_resolver(&self) -> &ValidatorsResolver {
126        self.blockchain_rpc_client
127            .overlay_client()
128            .validators_resolver()
129    }
130
131    /// Update current validator targets with the specified set.
132    pub fn update_validator_set(&self, vset: &ValidatorSet) {
133        self.validator_resolver().update_validator_set(vset);
134    }
135
136    /// Update current validator targets using the validator set from the provider
137    pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
138        // notify subscriber with an initial validators list
139        let mc_state = self
140            .core_storage
141            .shard_state_storage()
142            .load_state(block_id.seqno, block_id)
143            .await?;
144
145        let config = mc_state.config_params()?;
146        let current_vset = config.get_current_validator_set()?;
147        self.update_validator_set(&current_vset);
148        Ok(())
149    }
150
151    pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
152        ArchiveBlockProvider::new(
153            self.blockchain_rpc_client.clone(),
154            self.core_storage.clone(),
155            self.base_config.archive_block_provider.clone(),
156        )
157    }
158
159    pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
160        BlockchainBlockProvider::new(
161            self.blockchain_rpc_client.clone(),
162            self.core_storage.clone(),
163            self.base_config.blockchain_block_provider.clone(),
164        )
165    }
166
167    pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
168        StorageBlockProvider::new(self.core_storage.clone())
169    }
170
171    /// Creates a new [`BlockStrider`] using options from the base config.
172    pub fn build_strider<P, S>(
173        &self,
174        provider: P,
175        subscriber: S,
176    ) -> BlockStrider<PersistentBlockStriderState, P, S>
177    where
178        P: BlockProvider,
179        S: BlockSubscriber,
180    {
181        let state = PersistentBlockStriderState::new(
182            self.global_config.zerostate.as_block_id(),
183            self.core_storage.clone(),
184        );
185
186        BlockStrider::builder()
187            .with_state(state)
188            .with_provider(provider)
189            .with_block_subscriber(subscriber)
190            .build()
191    }
192}
193
194pub struct NodeBaseBuilder<'a, Step = ()> {
195    base_config: &'a NodeBaseConfig,
196    global_config: &'a GlobalConfig,
197    step: Step,
198}
199
200impl<'a> NodeBaseBuilder<'a, ()> {
201    pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
202        Self {
203            base_config,
204            global_config,
205            step: (),
206        }
207    }
208
209    pub fn init_network(
210        self,
211        public_addr: SocketAddr,
212        secret_key: &ed25519::SecretKey,
213    ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
214        let net = ConfiguredNetwork::new(
215            public_addr,
216            secret_key,
217            self.base_config,
218            &self.global_config.bootstrap_peers,
219        )?;
220
221        Ok(NodeBaseBuilder {
222            base_config: self.base_config,
223            global_config: self.global_config,
224            step: init::Step0 { net },
225        })
226    }
227}
228
229impl<'a> NodeBaseBuilder<'a, init::Step0> {
230    // TODO: Add some options here if needed.
231    pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
232        let store =
233            ConfiguredStorage::new(&self.base_config.storage, &self.base_config.core_storage)
234                .await?;
235
236        Ok(NodeBaseBuilder {
237            base_config: self.base_config,
238            global_config: self.global_config,
239            step: init::Step1 {
240                prev_step: self.step,
241                store,
242            },
243        })
244    }
245}
246
247impl<'a> NodeBaseBuilder<'a, init::Step1> {
248    pub fn init_blockchain_rpc<RL, SL>(
249        self,
250        remote_broadcast_listener: RL,
251        self_broadcast_listener: SL,
252    ) -> Result<NodeBaseBuilder<'a, init::Step2>>
253    where
254        RL: BroadcastListener,
255        SL: SelfBroadcastListener,
256    {
257        let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
258            &self.global_config.zerostate,
259            self.step.store.core_storage.clone(),
260            remote_broadcast_listener,
261            self_broadcast_listener,
262            self.base_config,
263        );
264
265        Ok(NodeBaseBuilder {
266            base_config: self.base_config,
267            global_config: self.global_config,
268            step: init::Step2 {
269                prev_step: self.step,
270                blockchain_rpc_client,
271            },
272        })
273    }
274}
275
276impl NodeBaseBuilder<'_, init::Final> {
277    pub fn build(self) -> Result<NodeBase> {
278        let net = self.step.prev_step.prev_step.net;
279        let store = self.step.prev_step.store;
280        let blockchain_rpc_client = self.step.blockchain_rpc_client;
281
282        Ok(NodeBase {
283            base_config: self.base_config.clone(),
284            global_config: self.global_config.clone(),
285            keypair: net.keypair,
286            network: net.network,
287            dht_client: net.dht_client,
288            peer_resolver: net.peer_resolver,
289            overlay_service: net.overlay_service,
290            storage_context: store.context,
291            core_storage: store.core_storage,
292            blockchain_rpc_client,
293        })
294    }
295}
296
297impl<'a, Step> NodeBaseBuilder<'a, Step> {
298    pub fn base_config(&self) -> &'a NodeBaseConfig {
299        self.base_config
300    }
301
302    pub fn global_config(&self) -> &'a GlobalConfig {
303        self.global_config
304    }
305}
306
307impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
308    pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
309        &self.step.as_ref().net.keypair
310    }
311
312    pub fn network(&self) -> &Network {
313        &self.step.as_ref().net.network
314    }
315
316    pub fn dht_client(&self) -> &DhtClient {
317        &self.step.as_ref().net.dht_client
318    }
319
320    pub fn peer_resolver(&self) -> &PeerResolver {
321        &self.step.as_ref().net.peer_resolver
322    }
323
324    pub fn overlay_service(&self) -> &OverlayService {
325        &self.step.as_ref().net.overlay_service
326    }
327}
328
329impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
330    pub fn storage_context(&self) -> &StorageContext {
331        &self.step.as_ref().store.context
332    }
333
334    pub fn core_storage(&self) -> &CoreStorage {
335        &self.step.as_ref().store.core_storage
336    }
337}
338
339impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
340    pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
341        &self.step.as_ref().blockchain_rpc_client
342    }
343}
344
345pub mod init {
346    use super::*;
347
348    pub type Final = Step2;
349
350    /// Node with network.
351    pub struct Step0 {
352        pub(super) net: ConfiguredNetwork,
353    }
354
355    impl AsRef<Step0> for Step0 {
356        #[inline]
357        fn as_ref(&self) -> &Step0 {
358            self
359        }
360    }
361
362    /// Node with network and storage.
363    pub struct Step1 {
364        pub(super) prev_step: Step0,
365        pub(super) store: ConfiguredStorage,
366    }
367
368    impl AsRef<Step0> for Step1 {
369        #[inline]
370        fn as_ref(&self) -> &Step0 {
371            &self.prev_step
372        }
373    }
374
375    impl AsRef<Step1> for Step1 {
376        #[inline]
377        fn as_ref(&self) -> &Step1 {
378            self
379        }
380    }
381
382    /// Node with network, storage and public overlay.
383    pub struct Step2 {
384        pub(super) prev_step: Step1,
385        pub(super) blockchain_rpc_client: BlockchainRpcClient,
386    }
387
388    impl AsRef<Step0> for Step2 {
389        #[inline]
390        fn as_ref(&self) -> &Step0 {
391            &self.prev_step.prev_step
392        }
393    }
394
395    impl AsRef<Step1> for Step2 {
396        #[inline]
397        fn as_ref(&self) -> &Step1 {
398            &self.prev_step
399        }
400    }
401
402    impl AsRef<Step2> for Step2 {
403        #[inline]
404        fn as_ref(&self) -> &Step2 {
405            self
406        }
407    }
408}
409
410pub struct ConfiguredNetwork {
411    pub keypair: Arc<ed25519::KeyPair>,
412    pub network: Network,
413    pub dht_client: DhtClient,
414    pub peer_resolver: PeerResolver,
415    pub overlay_service: OverlayService,
416}
417
418impl ConfiguredNetwork {
419    pub fn new(
420        public_addr: SocketAddr,
421        secret_key: &ed25519::SecretKey,
422        base_config: &NodeBaseConfig,
423        bootstrap_peers: &[PeerInfo],
424    ) -> Result<Self> {
425        // Setup network
426        let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
427        let local_id = keypair.public_key.into();
428
429        let (dht_tasks, dht_service) = DhtService::builder(local_id)
430            .with_config(base_config.dht.clone())
431            .build();
432
433        let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
434            .with_config(base_config.overlay.clone())
435            .with_dht_service(dht_service.clone())
436            .build();
437
438        let router = Router::builder()
439            .route(dht_service.clone())
440            .route(overlay_service.clone())
441            .build();
442
443        let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
444
445        let network = Network::builder()
446            .with_config(base_config.network.clone())
447            .with_private_key(secret_key.to_bytes())
448            .with_remote_addr(public_addr)
449            .build(local_addr, router)
450            .context("failed to build node network")?;
451
452        dht_tasks.spawn(&network);
453        overlay_tasks.spawn(&network);
454
455        let dht_client = dht_service.make_client(&network);
456        let peer_resolver = dht_service
457            .make_peer_resolver()
458            .with_config(base_config.peer_resolver.clone())
459            .build(&network);
460
461        let mut bootstrap_peer_count = 0usize;
462        for peer in bootstrap_peers {
463            let is_new = dht_client.add_peer(Arc::new(peer.clone()))?;
464            bootstrap_peer_count += is_new as usize;
465        }
466
467        tracing::info!(
468            %local_id,
469            %local_addr,
470            %public_addr,
471            bootstrap_peers = bootstrap_peer_count,
472            "initialized network"
473        );
474
475        Ok(Self {
476            keypair,
477            network,
478            dht_client,
479            peer_resolver,
480            overlay_service,
481        })
482    }
483
484    pub fn add_blockchain_rpc<BL, SL>(
485        &self,
486        zerostate: &ZerostateId,
487        storage: CoreStorage,
488        remote_broadcast_listener: BL,
489        self_broadcast_listener: SL,
490        base_config: &NodeBaseConfig,
491    ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
492    where
493        BL: BroadcastListener,
494        SL: SelfBroadcastListener,
495    {
496        let blockchain_rpc_service = BlockchainRpcService::builder()
497            .with_config(base_config.blockchain_rpc_service.clone())
498            .with_storage(storage)
499            .with_broadcast_listener(remote_broadcast_listener)
500            .build();
501
502        let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
503            .named("blockchain_rpc")
504            .with_peer_resolver(self.peer_resolver.clone())
505            .build(blockchain_rpc_service.clone());
506        self.overlay_service.add_public_overlay(&public_overlay);
507
508        let blockchain_rpc_client = BlockchainRpcClient::builder()
509            .with_config(base_config.blockchain_rpc_client.clone())
510            .with_public_overlay_client(PublicOverlayClient::new(
511                self.network.clone(),
512                public_overlay,
513                base_config.public_overlay_client.clone(),
514            ))
515            .with_self_broadcast_listener(self_broadcast_listener)
516            .build();
517
518        tracing::info!(
519            overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
520            "initialized blockchain rpc"
521        );
522
523        (blockchain_rpc_service, blockchain_rpc_client)
524    }
525}
526
527pub struct ConfiguredStorage {
528    pub context: StorageContext,
529    pub core_storage: CoreStorage,
530}
531
532impl ConfiguredStorage {
533    pub async fn new(
534        storage_config: &StorageConfig,
535        core_storage_config: &CoreStorageConfig,
536    ) -> Result<Self> {
537        let context = StorageContext::new(storage_config.clone())
538            .await
539            .context("failed to create storage context")?;
540        let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
541            .await
542            .context("failed to create storage")?;
543        tracing::info!(
544            root_dir = %core_storage.context().root_dir().path().display(),
545            "initialized storage"
546        );
547
548        Ok(Self {
549            context,
550            core_storage,
551        })
552    }
553}