1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8 DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13#[cfg(feature = "cli")]
14pub use self::cli::{CmdRunArgs, CmdRunOnlyArgs, CmdRunStatus, LightNodeConfig, LightNodeContext};
15pub use self::config::NodeBaseConfig;
16pub use self::keys::NodeKeys;
17use crate::block_strider::{
18 ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
19 ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
20 StorageBlockProvider,
21};
22#[cfg(feature = "s3")]
23use crate::blockchain_rpc::S3RpcDataProvider;
24use crate::blockchain_rpc::{
25 BlockchainRpcClient, BlockchainRpcService, BroadcastListener, IntoRpcDataProvider,
26 SelfBroadcastListener, StorageRpcDataProvider,
27};
28use crate::global_config::{GlobalConfig, ZerostateId};
29use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
30#[cfg(feature = "s3")]
31use crate::s3::S3Client;
32use crate::storage::{CoreStorage, CoreStorageConfig};
33
34#[cfg(feature = "cli")]
35mod cli;
36mod config;
37mod keys;
38
39pub struct NodeBootArgs {
40 pub boot_type: ColdBootType,
42 pub zerostates: Option<Vec<PathBuf>>,
44 pub queue_state_handler: Option<Box<dyn QueueStateHandler>>,
46 pub ignore_states: bool,
48}
49
50impl Default for NodeBootArgs {
51 #[inline]
52 fn default() -> Self {
53 Self {
54 boot_type: ColdBootType::LatestPersistent,
55 zerostates: None,
56 queue_state_handler: None,
57 ignore_states: false,
58 }
59 }
60}
61
62pub struct NodeBase {
63 pub base_config: NodeBaseConfig,
64 pub global_config: GlobalConfig,
65 pub initial_peer_count: usize,
66
67 pub keypair: Arc<ed25519::KeyPair>,
68 pub network: Network,
69 pub dht_client: DhtClient,
70 pub peer_resolver: PeerResolver,
71 pub overlay_service: OverlayService,
72
73 pub storage_context: StorageContext,
74 pub core_storage: CoreStorage,
75
76 pub blockchain_rpc_client: BlockchainRpcClient,
77
78 #[cfg(feature = "s3")]
79 pub s3_client: Option<S3Client>,
80}
81
82impl NodeBase {
83 const DEFAULT_INITIAL_PEER_COUNT: usize = 3;
84
85 pub fn builder<'a>(
86 base_config: &'a NodeBaseConfig,
87 global_config: &'a GlobalConfig,
88 ) -> NodeBaseBuilder<'a, ()> {
89 crate::record_version_metric();
90 NodeBaseBuilder::new(base_config, global_config)
91 }
92
93 pub async fn init(
95 &self,
96 boot_type: ColdBootType,
97 import_zerostate: Option<Vec<PathBuf>>,
98 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
99 ) -> Result<BlockId> {
100 self.init_ext(NodeBootArgs {
101 boot_type,
102 zerostates: import_zerostate,
103 queue_state_handler,
104 ..Default::default()
105 })
106 .await
107 }
108
109 pub async fn init_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
111 self.wait_for_neighbours(self.initial_peer_count).await;
112
113 let init_block_id = self.boot_ext(args).await.context("failed to init node")?;
114 tracing::info!(%init_block_id, "node initialized");
115
116 Ok(init_block_id)
117 }
118
119 pub async fn wait_for_neighbours(&self, count: usize) {
121 tracing::info!("waiting for initial neighbours");
123 self.blockchain_rpc_client
124 .overlay_client()
125 .neighbours()
126 .wait_for_peers(count)
127 .await;
128 tracing::info!("found initial neighbours");
129 }
130
131 pub async fn boot(
133 &self,
134 boot_type: ColdBootType,
135 zerostates: Option<Vec<PathBuf>>,
136 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
137 ) -> Result<BlockId> {
138 self.boot_ext(NodeBootArgs {
139 boot_type,
140 zerostates,
141 queue_state_handler,
142 ..Default::default()
143 })
144 .await
145 }
146
147 pub async fn boot_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
149 let node_state = self.core_storage.node_state();
150
151 let last_mc_block_id = match node_state.load_last_mc_block_id() {
152 Some(block_id) => block_id,
153 None => {
154 let mut starter = Starter::builder()
155 .with_storage(self.core_storage.clone())
156 .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
157 .with_zerostate_id(self.global_config.zerostate)
158 .with_config(self.base_config.starter.clone())
159 .ignore_states(args.ignore_states);
160
161 if let Some(handler) = args.queue_state_handler {
162 starter = starter.with_queue_state_handler(handler);
163 }
164
165 #[cfg(feature = "s3")]
166 if let Some(s3_client) = self.s3_client.as_ref() {
167 starter = starter.with_s3_client(s3_client.clone());
168 }
169
170 starter
171 .build()
172 .cold_boot(args.boot_type, args.zerostates.map(FileZerostateProvider))
173 .await?
174 }
175 };
176
177 tracing::info!(
178 %last_mc_block_id,
179 "boot finished"
180 );
181
182 Ok(last_mc_block_id)
183 }
184
185 pub fn validator_resolver(&self) -> &ValidatorsResolver {
186 self.blockchain_rpc_client
187 .overlay_client()
188 .validators_resolver()
189 }
190
191 pub fn update_validator_set(&self, vset: &ValidatorSet) {
193 self.validator_resolver().update_validator_set(vset);
194 }
195
196 pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
198 let mc_state = self
200 .core_storage
201 .shard_state_storage()
202 .load_state(block_id.seqno, block_id)
203 .await
204 .context("update_validator_set_from_shard_state failed to load state")?;
205
206 let config = mc_state.config_params()?;
207 let current_vset = config.get_current_validator_set()?;
208 self.update_validator_set(¤t_vset);
209 Ok(())
210 }
211
212 pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
213 ArchiveBlockProvider::new(
214 (
215 self.blockchain_rpc_client.clone(),
216 #[cfg(feature = "s3")]
217 self.s3_client.clone(),
218 ),
219 self.core_storage.clone(),
220 self.base_config.archive_block_provider.clone(),
221 )
222 }
223
224 pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
225 BlockchainBlockProvider::new(
226 self.blockchain_rpc_client.clone(),
227 self.core_storage.clone(),
228 self.base_config.blockchain_block_provider.clone(),
229 )
230 }
231
232 pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
233 StorageBlockProvider::new(self.core_storage.clone())
234 }
235
236 pub fn build_strider<P, S>(
238 &self,
239 provider: P,
240 subscriber: S,
241 ) -> BlockStrider<PersistentBlockStriderState, P, S>
242 where
243 P: BlockProvider,
244 S: BlockSubscriber,
245 {
246 let state = PersistentBlockStriderState::new(
247 self.global_config.zerostate.as_block_id(),
248 self.core_storage.clone(),
249 );
250
251 BlockStrider::builder()
252 .with_state(state)
253 .with_provider(provider)
254 .with_block_subscriber(subscriber)
255 .build()
256 }
257}
258
259pub struct NodeBaseBuilder<'a, Step = ()> {
260 common: NodeBaseBuilderCommon<'a>,
261 step: Step,
262}
263
264impl<'a> NodeBaseBuilder<'a, ()> {
265 pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
266 Self {
267 common: NodeBaseBuilderCommon {
268 base_config,
269 global_config,
270 initial_peer_count: NodeBase::DEFAULT_INITIAL_PEER_COUNT,
271 },
272 step: (),
273 }
274 }
275
276 pub fn init_network(
277 self,
278 public_addr: SocketAddr,
279 secret_key: &ed25519::SecretKey,
280 ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
281 let net = ConfiguredNetwork::new(
282 public_addr,
283 secret_key,
284 self.common.base_config,
285 &self.common.global_config.bootstrap_peers,
286 )?;
287
288 Ok(NodeBaseBuilder {
289 common: self.common,
290 step: init::Step0 { net },
291 })
292 }
293}
294
295impl<'a> NodeBaseBuilder<'a, init::Step0> {
296 pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
298 let store = ConfiguredStorage::new(
299 &self.common.base_config.storage,
300 &self.common.base_config.core_storage,
301 )
302 .await?;
303
304 Ok(NodeBaseBuilder {
305 common: self.common,
306 step: init::Step1 {
307 prev_step: self.step,
308 store,
309 },
310 })
311 }
312}
313
314impl<'a> NodeBaseBuilder<'a, init::Step1> {
315 pub fn init_blockchain_rpc<RL, SL>(
316 self,
317 remote_broadcast_listener: RL,
318 self_broadcast_listener: SL,
319 ) -> Result<NodeBaseBuilder<'a, init::Step2>>
320 where
321 RL: BroadcastListener,
322 SL: SelfBroadcastListener,
323 {
324 #[cfg(feature = "s3")]
325 let s3_client = self
326 .common
327 .base_config
328 .s3_client
329 .as_ref()
330 .map(S3Client::new)
331 .transpose()
332 .context("failed to create S3 client")?;
333
334 let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
335 &self.common.global_config.zerostate,
336 self.step.store.core_storage.clone(),
337 (
338 StorageRpcDataProvider::new(self.step.store.core_storage.clone()),
339 #[cfg(feature = "s3")]
340 s3_client.clone().and_then(|s3_client| {
341 let proxy_config = self
342 .common
343 .base_config
344 .blockchain_rpc_service
345 .s3_proxy
346 .as_ref()?; let storage = self.step.store.core_storage.clone();
348 Some(S3RpcDataProvider::new(s3_client, storage, proxy_config))
349 }),
350 ),
351 remote_broadcast_listener,
352 self_broadcast_listener,
353 self.common.base_config,
354 );
355
356 Ok(NodeBaseBuilder {
357 common: self.common,
358 step: init::Step2 {
359 prev_step: self.step,
360 blockchain_rpc_client,
361 #[cfg(feature = "s3")]
362 s3_client,
363 },
364 })
365 }
366}
367
368impl<'a> NodeBaseBuilder<'a, init::Final> {
369 pub fn build(self) -> Result<NodeBase> {
370 let net = self.step.prev_step.prev_step.net;
371 let store = self.step.prev_step.store;
372 let blockchain_rpc_client = self.step.blockchain_rpc_client;
373 #[cfg(feature = "s3")]
374 let s3_client = self.step.s3_client;
375
376 Ok(NodeBase {
377 base_config: self.common.base_config.clone(),
378 global_config: self.common.global_config.clone(),
379 initial_peer_count: self.common.initial_peer_count,
380 keypair: net.keypair,
381 network: net.network,
382 dht_client: net.dht_client,
383 peer_resolver: net.peer_resolver,
384 overlay_service: net.overlay_service,
385 storage_context: store.context,
386 core_storage: store.core_storage,
387 blockchain_rpc_client,
388 #[cfg(feature = "s3")]
389 s3_client,
390 })
391 }
392}
393
394impl<'a, Step> NodeBaseBuilder<'a, Step> {
395 pub fn base_config(&self) -> &'a NodeBaseConfig {
396 self.common.base_config
397 }
398
399 pub fn global_config(&self) -> &'a GlobalConfig {
400 self.common.global_config
401 }
402
403 pub fn initial_peer_count(&self) -> usize {
404 self.common.initial_peer_count
405 }
406
407 pub fn with_initial_peer_count(mut self, count: usize) -> Self {
408 self.common.initial_peer_count = count;
409 self
410 }
411}
412
413impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
414 pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
415 &self.step.as_ref().net.keypair
416 }
417
418 pub fn network(&self) -> &Network {
419 &self.step.as_ref().net.network
420 }
421
422 pub fn dht_client(&self) -> &DhtClient {
423 &self.step.as_ref().net.dht_client
424 }
425
426 pub fn peer_resolver(&self) -> &PeerResolver {
427 &self.step.as_ref().net.peer_resolver
428 }
429
430 pub fn overlay_service(&self) -> &OverlayService {
431 &self.step.as_ref().net.overlay_service
432 }
433}
434
435impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
436 pub fn storage_context(&self) -> &StorageContext {
437 &self.step.as_ref().store.context
438 }
439
440 pub fn core_storage(&self) -> &CoreStorage {
441 &self.step.as_ref().store.core_storage
442 }
443}
444
445impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
446 pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
447 &self.step.as_ref().blockchain_rpc_client
448 }
449}
450
451struct NodeBaseBuilderCommon<'a> {
452 base_config: &'a NodeBaseConfig,
453 global_config: &'a GlobalConfig,
454 initial_peer_count: usize,
455}
456
457pub mod init {
458 use super::*;
459
460 pub type Final = Step2;
461
462 pub struct Step0 {
464 pub(super) net: ConfiguredNetwork,
465 }
466
467 impl AsRef<Step0> for Step0 {
468 #[inline]
469 fn as_ref(&self) -> &Step0 {
470 self
471 }
472 }
473
474 pub struct Step1 {
476 pub(super) prev_step: Step0,
477 pub(super) store: ConfiguredStorage,
478 }
479
480 impl AsRef<Step0> for Step1 {
481 #[inline]
482 fn as_ref(&self) -> &Step0 {
483 &self.prev_step
484 }
485 }
486
487 impl AsRef<Step1> for Step1 {
488 #[inline]
489 fn as_ref(&self) -> &Step1 {
490 self
491 }
492 }
493
494 pub struct Step2 {
496 pub(super) prev_step: Step1,
497 pub(super) blockchain_rpc_client: BlockchainRpcClient,
498 #[cfg(feature = "s3")]
499 pub(super) s3_client: Option<S3Client>,
500 }
501
502 impl AsRef<Step0> for Step2 {
503 #[inline]
504 fn as_ref(&self) -> &Step0 {
505 &self.prev_step.prev_step
506 }
507 }
508
509 impl AsRef<Step1> for Step2 {
510 #[inline]
511 fn as_ref(&self) -> &Step1 {
512 &self.prev_step
513 }
514 }
515
516 impl AsRef<Step2> for Step2 {
517 #[inline]
518 fn as_ref(&self) -> &Step2 {
519 self
520 }
521 }
522}
523
524pub struct ConfiguredNetwork {
525 pub keypair: Arc<ed25519::KeyPair>,
526 pub network: Network,
527 pub dht_client: DhtClient,
528 pub peer_resolver: PeerResolver,
529 pub overlay_service: OverlayService,
530}
531
532impl ConfiguredNetwork {
533 pub fn new(
534 public_addr: SocketAddr,
535 secret_key: &ed25519::SecretKey,
536 base_config: &NodeBaseConfig,
537 bootstrap_peers: &[PeerInfo],
538 ) -> Result<Self> {
539 let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
541 let local_id = keypair.public_key.into();
542
543 let (dht_tasks, dht_service) = DhtService::builder(local_id)
544 .with_config(base_config.dht.clone())
545 .build();
546
547 let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
548 .with_config(base_config.overlay.clone())
549 .with_dht_service(dht_service.clone())
550 .build();
551
552 let router = Router::builder()
553 .route(dht_service.clone())
554 .route(overlay_service.clone())
555 .build();
556
557 let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
558
559 let network = Network::builder()
560 .with_config(base_config.network.clone())
561 .with_private_key(secret_key.to_bytes())
562 .with_remote_addr(public_addr)
563 .build(local_addr, router)
564 .context("failed to build node network")?;
565
566 let bootstrap_peer_count = dht_tasks.spawn(&network, bootstrap_peers)?;
567 overlay_tasks.spawn(&network);
568
569 let dht_client = dht_service.make_client(&network);
570 let peer_resolver = dht_service
571 .make_peer_resolver()
572 .with_config(base_config.peer_resolver.clone())
573 .build(&network);
574
575 tracing::info!(
576 %local_id,
577 %local_addr,
578 %public_addr,
579 bootstrap_peers = bootstrap_peer_count,
580 "initialized network"
581 );
582
583 Ok(Self {
584 keypair,
585 network,
586 dht_client,
587 peer_resolver,
588 overlay_service,
589 })
590 }
591
592 pub fn add_blockchain_rpc<BL, SL, RP>(
593 &self,
594 zerostate: &ZerostateId,
595 storage: CoreStorage,
596 rpc_provider: RP,
597 remote_broadcast_listener: BL,
598 self_broadcast_listener: SL,
599 base_config: &NodeBaseConfig,
600 ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
601 where
602 BL: BroadcastListener,
603 SL: SelfBroadcastListener,
604 RP: IntoRpcDataProvider,
605 {
606 let blockchain_rpc_service = BlockchainRpcService::builder()
607 .with_config(base_config.blockchain_rpc_service.clone())
608 .with_storage(storage)
609 .with_data_provider(rpc_provider)
610 .with_broadcast_listener(remote_broadcast_listener)
611 .build();
612
613 let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
614 .named("blockchain_rpc")
615 .with_peer_resolver(self.peer_resolver.clone())
616 .build(blockchain_rpc_service.clone());
617 self.overlay_service.add_public_overlay(&public_overlay);
618
619 let blockchain_rpc_client = BlockchainRpcClient::builder()
620 .with_config(base_config.blockchain_rpc_client.clone())
621 .with_public_overlay_client(PublicOverlayClient::new(
622 self.network.clone(),
623 public_overlay,
624 base_config.public_overlay_client.clone(),
625 ))
626 .with_self_broadcast_listener(self_broadcast_listener)
627 .build();
628
629 tracing::info!(
630 overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
631 "initialized blockchain rpc"
632 );
633
634 (blockchain_rpc_service, blockchain_rpc_client)
635 }
636}
637
638pub struct ConfiguredStorage {
639 pub context: StorageContext,
640 pub core_storage: CoreStorage,
641}
642
643impl ConfiguredStorage {
644 pub async fn new(
645 storage_config: &StorageConfig,
646 core_storage_config: &CoreStorageConfig,
647 ) -> Result<Self> {
648 let context = StorageContext::new(storage_config.clone())
649 .await
650 .context("failed to create storage context")?;
651 let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
652 .await
653 .context("failed to create storage")?;
654 tracing::info!(
655 root_dir = %core_storage.context().root_dir().path().display(),
656 "initialized storage"
657 );
658
659 Ok(Self {
660 context,
661 core_storage,
662 })
663 }
664}