1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8 DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13#[cfg(feature = "cli")]
14pub use self::cli::{CmdRunArgs, CmdRunOnlyArgs, CmdRunStatus, LightNodeConfig, LightNodeContext};
15pub use self::config::NodeBaseConfig;
16pub use self::keys::NodeKeys;
17use crate::block_strider::{
18 ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
19 ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
20 StorageBlockProvider,
21};
22use crate::blockchain_rpc::{
23 BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
24};
25use crate::global_config::{GlobalConfig, ZerostateId};
26use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
27#[cfg(feature = "s3")]
28use crate::s3::S3Client;
29use crate::storage::{CoreStorage, CoreStorageConfig};
30
31#[cfg(feature = "cli")]
32mod cli;
33mod config;
34mod keys;
35
36pub struct NodeBootArgs {
37 pub boot_type: ColdBootType,
39 pub zerostates: Option<Vec<PathBuf>>,
41 pub queue_state_handler: Option<Box<dyn QueueStateHandler>>,
43 pub ignore_states: bool,
45}
46
47impl Default for NodeBootArgs {
48 #[inline]
49 fn default() -> Self {
50 Self {
51 boot_type: ColdBootType::LatestPersistent,
52 zerostates: None,
53 queue_state_handler: None,
54 ignore_states: false,
55 }
56 }
57}
58
59pub struct NodeBase {
60 pub base_config: NodeBaseConfig,
61 pub global_config: GlobalConfig,
62 pub initial_peer_count: usize,
63
64 pub keypair: Arc<ed25519::KeyPair>,
65 pub network: Network,
66 pub dht_client: DhtClient,
67 pub peer_resolver: PeerResolver,
68 pub overlay_service: OverlayService,
69
70 pub storage_context: StorageContext,
71 pub core_storage: CoreStorage,
72
73 pub blockchain_rpc_client: BlockchainRpcClient,
74
75 #[cfg(feature = "s3")]
76 pub s3_client: Option<S3Client>,
77}
78
79impl NodeBase {
80 const DEFAULT_INITIAL_PEER_COUNT: usize = 3;
81
82 pub fn builder<'a>(
83 base_config: &'a NodeBaseConfig,
84 global_config: &'a GlobalConfig,
85 ) -> NodeBaseBuilder<'a, ()> {
86 crate::record_version_metric();
87 NodeBaseBuilder::new(base_config, global_config)
88 }
89
90 pub async fn init(
92 &self,
93 boot_type: ColdBootType,
94 import_zerostate: Option<Vec<PathBuf>>,
95 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
96 ) -> Result<BlockId> {
97 self.init_ext(NodeBootArgs {
98 boot_type,
99 zerostates: import_zerostate,
100 queue_state_handler,
101 ..Default::default()
102 })
103 .await
104 }
105
106 pub async fn init_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
108 self.wait_for_neighbours(self.initial_peer_count).await;
109
110 let init_block_id = self.boot_ext(args).await.context("failed to init node")?;
111 tracing::info!(%init_block_id, "node initialized");
112
113 Ok(init_block_id)
114 }
115
116 pub async fn wait_for_neighbours(&self, count: usize) {
118 tracing::info!("waiting for initial neighbours");
120 self.blockchain_rpc_client
121 .overlay_client()
122 .neighbours()
123 .wait_for_peers(count)
124 .await;
125 tracing::info!("found initial neighbours");
126 }
127
128 pub async fn boot(
130 &self,
131 boot_type: ColdBootType,
132 zerostates: Option<Vec<PathBuf>>,
133 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
134 ) -> Result<BlockId> {
135 self.boot_ext(NodeBootArgs {
136 boot_type,
137 zerostates,
138 queue_state_handler,
139 ..Default::default()
140 })
141 .await
142 }
143
144 pub async fn boot_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
146 let node_state = self.core_storage.node_state();
147
148 let last_mc_block_id = match node_state.load_last_mc_block_id() {
149 Some(block_id) => block_id,
150 None => {
151 let mut starter = Starter::builder()
152 .with_storage(self.core_storage.clone())
153 .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
154 .with_zerostate_id(self.global_config.zerostate)
155 .with_config(self.base_config.starter.clone())
156 .ignore_states(args.ignore_states);
157
158 if let Some(handler) = args.queue_state_handler {
159 starter = starter.with_queue_state_handler(handler);
160 }
161
162 #[cfg(feature = "s3")]
163 if let Some(s3_client) = self.s3_client.as_ref() {
164 starter = starter.with_s3_client(s3_client.clone());
165 }
166
167 starter
168 .build()
169 .cold_boot(args.boot_type, args.zerostates.map(FileZerostateProvider))
170 .await?
171 }
172 };
173
174 tracing::info!(
175 %last_mc_block_id,
176 "boot finished"
177 );
178
179 Ok(last_mc_block_id)
180 }
181
182 pub fn validator_resolver(&self) -> &ValidatorsResolver {
183 self.blockchain_rpc_client
184 .overlay_client()
185 .validators_resolver()
186 }
187
188 pub fn update_validator_set(&self, vset: &ValidatorSet) {
190 self.validator_resolver().update_validator_set(vset);
191 }
192
193 pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
195 let mc_state = self
197 .core_storage
198 .shard_state_storage()
199 .load_state(block_id.seqno, block_id)
200 .await
201 .context("update_validator_set_from_shard_state failed to load state")?;
202
203 let config = mc_state.config_params()?;
204 let current_vset = config.get_current_validator_set()?;
205 self.update_validator_set(¤t_vset);
206 Ok(())
207 }
208
209 pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
210 ArchiveBlockProvider::new(
211 (
212 self.blockchain_rpc_client.clone(),
213 #[cfg(feature = "s3")]
214 self.s3_client.clone(),
215 ),
216 self.core_storage.clone(),
217 self.base_config.archive_block_provider.clone(),
218 )
219 }
220
221 pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
222 BlockchainBlockProvider::new(
223 self.blockchain_rpc_client.clone(),
224 self.core_storage.clone(),
225 self.base_config.blockchain_block_provider.clone(),
226 )
227 }
228
229 pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
230 StorageBlockProvider::new(self.core_storage.clone())
231 }
232
233 pub fn build_strider<P, S>(
235 &self,
236 provider: P,
237 subscriber: S,
238 ) -> BlockStrider<PersistentBlockStriderState, P, S>
239 where
240 P: BlockProvider,
241 S: BlockSubscriber,
242 {
243 let state = PersistentBlockStriderState::new(
244 self.global_config.zerostate.as_block_id(),
245 self.core_storage.clone(),
246 );
247
248 BlockStrider::builder()
249 .with_state(state)
250 .with_provider(provider)
251 .with_block_subscriber(subscriber)
252 .build()
253 }
254}
255
256pub struct NodeBaseBuilder<'a, Step = ()> {
257 common: NodeBaseBuilderCommon<'a>,
258 step: Step,
259}
260
261impl<'a> NodeBaseBuilder<'a, ()> {
262 pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
263 Self {
264 common: NodeBaseBuilderCommon {
265 base_config,
266 global_config,
267 initial_peer_count: NodeBase::DEFAULT_INITIAL_PEER_COUNT,
268 },
269 step: (),
270 }
271 }
272
273 pub fn init_network(
274 self,
275 public_addr: SocketAddr,
276 secret_key: &ed25519::SecretKey,
277 ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
278 let net = ConfiguredNetwork::new(
279 public_addr,
280 secret_key,
281 self.common.base_config,
282 &self.common.global_config.bootstrap_peers,
283 )?;
284
285 Ok(NodeBaseBuilder {
286 common: self.common,
287 step: init::Step0 { net },
288 })
289 }
290}
291
292impl<'a> NodeBaseBuilder<'a, init::Step0> {
293 pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
295 let store = ConfiguredStorage::new(
296 &self.common.base_config.storage,
297 &self.common.base_config.core_storage,
298 )
299 .await?;
300
301 Ok(NodeBaseBuilder {
302 common: self.common,
303 step: init::Step1 {
304 prev_step: self.step,
305 store,
306 },
307 })
308 }
309}
310
311impl<'a> NodeBaseBuilder<'a, init::Step1> {
312 pub fn init_blockchain_rpc<RL, SL>(
313 self,
314 remote_broadcast_listener: RL,
315 self_broadcast_listener: SL,
316 ) -> Result<NodeBaseBuilder<'a, init::Step2>>
317 where
318 RL: BroadcastListener,
319 SL: SelfBroadcastListener,
320 {
321 let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
322 &self.common.global_config.zerostate,
323 self.step.store.core_storage.clone(),
324 remote_broadcast_listener,
325 self_broadcast_listener,
326 self.common.base_config,
327 );
328
329 Ok(NodeBaseBuilder {
330 common: self.common,
331 step: init::Step2 {
332 prev_step: self.step,
333 blockchain_rpc_client,
334 },
335 })
336 }
337}
338
339impl<'a> NodeBaseBuilder<'a, init::Final> {
340 pub fn build(self) -> Result<NodeBase> {
341 let net = self.step.prev_step.prev_step.net;
342 let store = self.step.prev_step.store;
343 let blockchain_rpc_client = self.step.blockchain_rpc_client;
344
345 Ok(NodeBase {
346 base_config: self.common.base_config.clone(),
347 global_config: self.common.global_config.clone(),
348 initial_peer_count: self.common.initial_peer_count,
349 keypair: net.keypair,
350 network: net.network,
351 dht_client: net.dht_client,
352 peer_resolver: net.peer_resolver,
353 overlay_service: net.overlay_service,
354 storage_context: store.context,
355 core_storage: store.core_storage,
356 blockchain_rpc_client,
357 #[cfg(feature = "s3")]
358 s3_client: self
359 .common
360 .base_config
361 .s3_client
362 .as_ref()
363 .map(S3Client::new)
364 .transpose()
365 .context("failed to create S3 client")?,
366 })
367 }
368}
369
370impl<'a, Step> NodeBaseBuilder<'a, Step> {
371 pub fn base_config(&self) -> &'a NodeBaseConfig {
372 self.common.base_config
373 }
374
375 pub fn global_config(&self) -> &'a GlobalConfig {
376 self.common.global_config
377 }
378
379 pub fn initial_peer_count(&self) -> usize {
380 self.common.initial_peer_count
381 }
382
383 pub fn with_initial_peer_count(mut self, count: usize) -> Self {
384 self.common.initial_peer_count = count;
385 self
386 }
387}
388
389impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
390 pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
391 &self.step.as_ref().net.keypair
392 }
393
394 pub fn network(&self) -> &Network {
395 &self.step.as_ref().net.network
396 }
397
398 pub fn dht_client(&self) -> &DhtClient {
399 &self.step.as_ref().net.dht_client
400 }
401
402 pub fn peer_resolver(&self) -> &PeerResolver {
403 &self.step.as_ref().net.peer_resolver
404 }
405
406 pub fn overlay_service(&self) -> &OverlayService {
407 &self.step.as_ref().net.overlay_service
408 }
409}
410
411impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
412 pub fn storage_context(&self) -> &StorageContext {
413 &self.step.as_ref().store.context
414 }
415
416 pub fn core_storage(&self) -> &CoreStorage {
417 &self.step.as_ref().store.core_storage
418 }
419}
420
421impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
422 pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
423 &self.step.as_ref().blockchain_rpc_client
424 }
425}
426
427struct NodeBaseBuilderCommon<'a> {
428 base_config: &'a NodeBaseConfig,
429 global_config: &'a GlobalConfig,
430 initial_peer_count: usize,
431}
432
433pub mod init {
434 use super::*;
435
436 pub type Final = Step2;
437
438 pub struct Step0 {
440 pub(super) net: ConfiguredNetwork,
441 }
442
443 impl AsRef<Step0> for Step0 {
444 #[inline]
445 fn as_ref(&self) -> &Step0 {
446 self
447 }
448 }
449
450 pub struct Step1 {
452 pub(super) prev_step: Step0,
453 pub(super) store: ConfiguredStorage,
454 }
455
456 impl AsRef<Step0> for Step1 {
457 #[inline]
458 fn as_ref(&self) -> &Step0 {
459 &self.prev_step
460 }
461 }
462
463 impl AsRef<Step1> for Step1 {
464 #[inline]
465 fn as_ref(&self) -> &Step1 {
466 self
467 }
468 }
469
470 pub struct Step2 {
472 pub(super) prev_step: Step1,
473 pub(super) blockchain_rpc_client: BlockchainRpcClient,
474 }
475
476 impl AsRef<Step0> for Step2 {
477 #[inline]
478 fn as_ref(&self) -> &Step0 {
479 &self.prev_step.prev_step
480 }
481 }
482
483 impl AsRef<Step1> for Step2 {
484 #[inline]
485 fn as_ref(&self) -> &Step1 {
486 &self.prev_step
487 }
488 }
489
490 impl AsRef<Step2> for Step2 {
491 #[inline]
492 fn as_ref(&self) -> &Step2 {
493 self
494 }
495 }
496}
497
498pub struct ConfiguredNetwork {
499 pub keypair: Arc<ed25519::KeyPair>,
500 pub network: Network,
501 pub dht_client: DhtClient,
502 pub peer_resolver: PeerResolver,
503 pub overlay_service: OverlayService,
504}
505
506impl ConfiguredNetwork {
507 pub fn new(
508 public_addr: SocketAddr,
509 secret_key: &ed25519::SecretKey,
510 base_config: &NodeBaseConfig,
511 bootstrap_peers: &[PeerInfo],
512 ) -> Result<Self> {
513 let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
515 let local_id = keypair.public_key.into();
516
517 let (dht_tasks, dht_service) = DhtService::builder(local_id)
518 .with_config(base_config.dht.clone())
519 .build();
520
521 let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
522 .with_config(base_config.overlay.clone())
523 .with_dht_service(dht_service.clone())
524 .build();
525
526 let router = Router::builder()
527 .route(dht_service.clone())
528 .route(overlay_service.clone())
529 .build();
530
531 let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
532
533 let network = Network::builder()
534 .with_config(base_config.network.clone())
535 .with_private_key(secret_key.to_bytes())
536 .with_remote_addr(public_addr)
537 .build(local_addr, router)
538 .context("failed to build node network")?;
539
540 let bootstrap_peer_count = dht_tasks.spawn(&network, bootstrap_peers)?;
541 overlay_tasks.spawn(&network);
542
543 let dht_client = dht_service.make_client(&network);
544 let peer_resolver = dht_service
545 .make_peer_resolver()
546 .with_config(base_config.peer_resolver.clone())
547 .build(&network);
548
549 tracing::info!(
550 %local_id,
551 %local_addr,
552 %public_addr,
553 bootstrap_peers = bootstrap_peer_count,
554 "initialized network"
555 );
556
557 Ok(Self {
558 keypair,
559 network,
560 dht_client,
561 peer_resolver,
562 overlay_service,
563 })
564 }
565
566 pub fn add_blockchain_rpc<BL, SL>(
567 &self,
568 zerostate: &ZerostateId,
569 storage: CoreStorage,
570 remote_broadcast_listener: BL,
571 self_broadcast_listener: SL,
572 base_config: &NodeBaseConfig,
573 ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
574 where
575 BL: BroadcastListener,
576 SL: SelfBroadcastListener,
577 {
578 let blockchain_rpc_service = BlockchainRpcService::builder()
579 .with_config(base_config.blockchain_rpc_service.clone())
580 .with_storage(storage)
581 .with_broadcast_listener(remote_broadcast_listener)
582 .build();
583
584 let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
585 .named("blockchain_rpc")
586 .with_peer_resolver(self.peer_resolver.clone())
587 .build(blockchain_rpc_service.clone());
588 self.overlay_service.add_public_overlay(&public_overlay);
589
590 let blockchain_rpc_client = BlockchainRpcClient::builder()
591 .with_config(base_config.blockchain_rpc_client.clone())
592 .with_public_overlay_client(PublicOverlayClient::new(
593 self.network.clone(),
594 public_overlay,
595 base_config.public_overlay_client.clone(),
596 ))
597 .with_self_broadcast_listener(self_broadcast_listener)
598 .build();
599
600 tracing::info!(
601 overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
602 "initialized blockchain rpc"
603 );
604
605 (blockchain_rpc_service, blockchain_rpc_client)
606 }
607}
608
609pub struct ConfiguredStorage {
610 pub context: StorageContext,
611 pub core_storage: CoreStorage,
612}
613
614impl ConfiguredStorage {
615 pub async fn new(
616 storage_config: &StorageConfig,
617 core_storage_config: &CoreStorageConfig,
618 ) -> Result<Self> {
619 let context = StorageContext::new(storage_config.clone())
620 .await
621 .context("failed to create storage context")?;
622 let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
623 .await
624 .context("failed to create storage")?;
625 tracing::info!(
626 root_dir = %core_storage.context().root_dir().path().display(),
627 "initialized storage"
628 );
629
630 Ok(Self {
631 context,
632 core_storage,
633 })
634 }
635}