1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8 DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13#[cfg(feature = "cli")]
14pub use self::cli::{CmdRunArgs, CmdRunOnlyArgs, CmdRunStatus, LightNodeConfig, LightNodeContext};
15pub use self::config::NodeBaseConfig;
16pub use self::keys::NodeKeys;
17use crate::block_strider::{
18 ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
19 ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
20 StorageBlockProvider,
21};
22use crate::blockchain_rpc::{
23 BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
24};
25use crate::global_config::{GlobalConfig, ZerostateId};
26use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
27#[cfg(feature = "s3")]
28use crate::s3::S3Client;
29use crate::storage::{CoreStorage, CoreStorageConfig};
30
31#[cfg(feature = "cli")]
32mod cli;
33mod config;
34mod keys;
35
36pub struct NodeBootArgs {
37 pub boot_type: ColdBootType,
39 pub zerostates: Option<Vec<PathBuf>>,
41 pub queue_state_handler: Option<Box<dyn QueueStateHandler>>,
43 pub ignore_states: bool,
45}
46
47impl Default for NodeBootArgs {
48 #[inline]
49 fn default() -> Self {
50 Self {
51 boot_type: ColdBootType::LatestPersistent,
52 zerostates: None,
53 queue_state_handler: None,
54 ignore_states: false,
55 }
56 }
57}
58
59pub struct NodeBase {
60 pub base_config: NodeBaseConfig,
61 pub global_config: GlobalConfig,
62 pub initial_peer_count: usize,
63
64 pub keypair: Arc<ed25519::KeyPair>,
65 pub network: Network,
66 pub dht_client: DhtClient,
67 pub peer_resolver: PeerResolver,
68 pub overlay_service: OverlayService,
69
70 pub storage_context: StorageContext,
71 pub core_storage: CoreStorage,
72
73 pub blockchain_rpc_client: BlockchainRpcClient,
74
75 #[cfg(feature = "s3")]
76 pub s3_client: Option<S3Client>,
77}
78
79impl NodeBase {
80 const DEFAULT_INITIAL_PEER_COUNT: usize = 3;
81
82 pub fn builder<'a>(
83 base_config: &'a NodeBaseConfig,
84 global_config: &'a GlobalConfig,
85 ) -> NodeBaseBuilder<'a, ()> {
86 crate::record_version_metric();
87 NodeBaseBuilder::new(base_config, global_config)
88 }
89
90 pub async fn init(
92 &self,
93 boot_type: ColdBootType,
94 import_zerostate: Option<Vec<PathBuf>>,
95 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
96 ) -> Result<BlockId> {
97 self.init_ext(NodeBootArgs {
98 boot_type,
99 zerostates: import_zerostate,
100 queue_state_handler,
101 ..Default::default()
102 })
103 .await
104 }
105
106 pub async fn init_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
108 self.wait_for_neighbours(self.initial_peer_count).await;
109
110 let init_block_id = self.boot_ext(args).await.context("failed to init node")?;
111 tracing::info!(%init_block_id, "node initialized");
112
113 Ok(init_block_id)
114 }
115
116 pub async fn wait_for_neighbours(&self, count: usize) {
118 tracing::info!("waiting for initial neighbours");
120 self.blockchain_rpc_client
121 .overlay_client()
122 .neighbours()
123 .wait_for_peers(count)
124 .await;
125 tracing::info!("found initial neighbours");
126 }
127
128 pub async fn boot(
130 &self,
131 boot_type: ColdBootType,
132 zerostates: Option<Vec<PathBuf>>,
133 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
134 ) -> Result<BlockId> {
135 self.boot_ext(NodeBootArgs {
136 boot_type,
137 zerostates,
138 queue_state_handler,
139 ..Default::default()
140 })
141 .await
142 }
143
144 pub async fn boot_ext(&self, args: NodeBootArgs) -> Result<BlockId> {
146 let node_state = self.core_storage.node_state();
147
148 let last_mc_block_id = match node_state.load_last_mc_block_id() {
149 Some(block_id) => block_id,
150 None => {
151 let mut starter = Starter::builder()
152 .with_storage(self.core_storage.clone())
153 .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
154 .with_zerostate_id(self.global_config.zerostate)
155 .with_config(self.base_config.starter.clone())
156 .ignore_states(args.ignore_states);
157
158 if let Some(handler) = args.queue_state_handler {
159 starter = starter.with_queue_state_handler(handler);
160 }
161
162 #[cfg(feature = "s3")]
163 if let Some(s3_client) = self.s3_client.as_ref() {
164 starter = starter.with_s3_client(s3_client.clone());
165 }
166
167 starter
168 .build()
169 .cold_boot(args.boot_type, args.zerostates.map(FileZerostateProvider))
170 .await?
171 }
172 };
173
174 tracing::info!(
175 %last_mc_block_id,
176 "boot finished"
177 );
178
179 Ok(last_mc_block_id)
180 }
181
182 pub fn validator_resolver(&self) -> &ValidatorsResolver {
183 self.blockchain_rpc_client
184 .overlay_client()
185 .validators_resolver()
186 }
187
188 pub fn update_validator_set(&self, vset: &ValidatorSet) {
190 self.validator_resolver().update_validator_set(vset);
191 }
192
193 pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
195 let mc_state = self
197 .core_storage
198 .shard_state_storage()
199 .load_state(block_id.seqno, block_id)
200 .await?;
201
202 let config = mc_state.config_params()?;
203 let current_vset = config.get_current_validator_set()?;
204 self.update_validator_set(¤t_vset);
205 Ok(())
206 }
207
208 pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
209 ArchiveBlockProvider::new(
210 (
211 self.blockchain_rpc_client.clone(),
212 #[cfg(feature = "s3")]
213 self.s3_client.clone(),
214 ),
215 self.core_storage.clone(),
216 self.base_config.archive_block_provider.clone(),
217 )
218 }
219
220 pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
221 BlockchainBlockProvider::new(
222 self.blockchain_rpc_client.clone(),
223 self.core_storage.clone(),
224 self.base_config.blockchain_block_provider.clone(),
225 )
226 }
227
228 pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
229 StorageBlockProvider::new(self.core_storage.clone())
230 }
231
232 pub fn build_strider<P, S>(
234 &self,
235 provider: P,
236 subscriber: S,
237 ) -> BlockStrider<PersistentBlockStriderState, P, S>
238 where
239 P: BlockProvider,
240 S: BlockSubscriber,
241 {
242 let state = PersistentBlockStriderState::new(
243 self.global_config.zerostate.as_block_id(),
244 self.core_storage.clone(),
245 );
246
247 BlockStrider::builder()
248 .with_state(state)
249 .with_provider(provider)
250 .with_block_subscriber(subscriber)
251 .build()
252 }
253}
254
255pub struct NodeBaseBuilder<'a, Step = ()> {
256 common: NodeBaseBuilderCommon<'a>,
257 step: Step,
258}
259
260impl<'a> NodeBaseBuilder<'a, ()> {
261 pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
262 Self {
263 common: NodeBaseBuilderCommon {
264 base_config,
265 global_config,
266 initial_peer_count: NodeBase::DEFAULT_INITIAL_PEER_COUNT,
267 },
268 step: (),
269 }
270 }
271
272 pub fn init_network(
273 self,
274 public_addr: SocketAddr,
275 secret_key: &ed25519::SecretKey,
276 ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
277 let net = ConfiguredNetwork::new(
278 public_addr,
279 secret_key,
280 self.common.base_config,
281 &self.common.global_config.bootstrap_peers,
282 )?;
283
284 Ok(NodeBaseBuilder {
285 common: self.common,
286 step: init::Step0 { net },
287 })
288 }
289}
290
291impl<'a> NodeBaseBuilder<'a, init::Step0> {
292 pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
294 let store = ConfiguredStorage::new(
295 &self.common.base_config.storage,
296 &self.common.base_config.core_storage,
297 )
298 .await?;
299
300 Ok(NodeBaseBuilder {
301 common: self.common,
302 step: init::Step1 {
303 prev_step: self.step,
304 store,
305 },
306 })
307 }
308}
309
310impl<'a> NodeBaseBuilder<'a, init::Step1> {
311 pub fn init_blockchain_rpc<RL, SL>(
312 self,
313 remote_broadcast_listener: RL,
314 self_broadcast_listener: SL,
315 ) -> Result<NodeBaseBuilder<'a, init::Step2>>
316 where
317 RL: BroadcastListener,
318 SL: SelfBroadcastListener,
319 {
320 let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
321 &self.common.global_config.zerostate,
322 self.step.store.core_storage.clone(),
323 remote_broadcast_listener,
324 self_broadcast_listener,
325 self.common.base_config,
326 );
327
328 Ok(NodeBaseBuilder {
329 common: self.common,
330 step: init::Step2 {
331 prev_step: self.step,
332 blockchain_rpc_client,
333 },
334 })
335 }
336}
337
338impl<'a> NodeBaseBuilder<'a, init::Final> {
339 pub fn build(self) -> Result<NodeBase> {
340 let net = self.step.prev_step.prev_step.net;
341 let store = self.step.prev_step.store;
342 let blockchain_rpc_client = self.step.blockchain_rpc_client;
343
344 Ok(NodeBase {
345 base_config: self.common.base_config.clone(),
346 global_config: self.common.global_config.clone(),
347 initial_peer_count: self.common.initial_peer_count,
348 keypair: net.keypair,
349 network: net.network,
350 dht_client: net.dht_client,
351 peer_resolver: net.peer_resolver,
352 overlay_service: net.overlay_service,
353 storage_context: store.context,
354 core_storage: store.core_storage,
355 blockchain_rpc_client,
356 #[cfg(feature = "s3")]
357 s3_client: self
358 .common
359 .base_config
360 .s3_client
361 .as_ref()
362 .map(S3Client::new)
363 .transpose()
364 .context("failed to create S3 client")?,
365 })
366 }
367}
368
369impl<'a, Step> NodeBaseBuilder<'a, Step> {
370 pub fn base_config(&self) -> &'a NodeBaseConfig {
371 self.common.base_config
372 }
373
374 pub fn global_config(&self) -> &'a GlobalConfig {
375 self.common.global_config
376 }
377
378 pub fn initial_peer_count(&self) -> usize {
379 self.common.initial_peer_count
380 }
381
382 pub fn with_initial_peer_count(mut self, count: usize) -> Self {
383 self.common.initial_peer_count = count;
384 self
385 }
386}
387
388impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
389 pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
390 &self.step.as_ref().net.keypair
391 }
392
393 pub fn network(&self) -> &Network {
394 &self.step.as_ref().net.network
395 }
396
397 pub fn dht_client(&self) -> &DhtClient {
398 &self.step.as_ref().net.dht_client
399 }
400
401 pub fn peer_resolver(&self) -> &PeerResolver {
402 &self.step.as_ref().net.peer_resolver
403 }
404
405 pub fn overlay_service(&self) -> &OverlayService {
406 &self.step.as_ref().net.overlay_service
407 }
408}
409
410impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
411 pub fn storage_context(&self) -> &StorageContext {
412 &self.step.as_ref().store.context
413 }
414
415 pub fn core_storage(&self) -> &CoreStorage {
416 &self.step.as_ref().store.core_storage
417 }
418}
419
420impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
421 pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
422 &self.step.as_ref().blockchain_rpc_client
423 }
424}
425
426struct NodeBaseBuilderCommon<'a> {
427 base_config: &'a NodeBaseConfig,
428 global_config: &'a GlobalConfig,
429 initial_peer_count: usize,
430}
431
432pub mod init {
433 use super::*;
434
435 pub type Final = Step2;
436
437 pub struct Step0 {
439 pub(super) net: ConfiguredNetwork,
440 }
441
442 impl AsRef<Step0> for Step0 {
443 #[inline]
444 fn as_ref(&self) -> &Step0 {
445 self
446 }
447 }
448
449 pub struct Step1 {
451 pub(super) prev_step: Step0,
452 pub(super) store: ConfiguredStorage,
453 }
454
455 impl AsRef<Step0> for Step1 {
456 #[inline]
457 fn as_ref(&self) -> &Step0 {
458 &self.prev_step
459 }
460 }
461
462 impl AsRef<Step1> for Step1 {
463 #[inline]
464 fn as_ref(&self) -> &Step1 {
465 self
466 }
467 }
468
469 pub struct Step2 {
471 pub(super) prev_step: Step1,
472 pub(super) blockchain_rpc_client: BlockchainRpcClient,
473 }
474
475 impl AsRef<Step0> for Step2 {
476 #[inline]
477 fn as_ref(&self) -> &Step0 {
478 &self.prev_step.prev_step
479 }
480 }
481
482 impl AsRef<Step1> for Step2 {
483 #[inline]
484 fn as_ref(&self) -> &Step1 {
485 &self.prev_step
486 }
487 }
488
489 impl AsRef<Step2> for Step2 {
490 #[inline]
491 fn as_ref(&self) -> &Step2 {
492 self
493 }
494 }
495}
496
497pub struct ConfiguredNetwork {
498 pub keypair: Arc<ed25519::KeyPair>,
499 pub network: Network,
500 pub dht_client: DhtClient,
501 pub peer_resolver: PeerResolver,
502 pub overlay_service: OverlayService,
503}
504
505impl ConfiguredNetwork {
506 pub fn new(
507 public_addr: SocketAddr,
508 secret_key: &ed25519::SecretKey,
509 base_config: &NodeBaseConfig,
510 bootstrap_peers: &[PeerInfo],
511 ) -> Result<Self> {
512 let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
514 let local_id = keypair.public_key.into();
515
516 let (dht_tasks, dht_service) = DhtService::builder(local_id)
517 .with_config(base_config.dht.clone())
518 .build();
519
520 let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
521 .with_config(base_config.overlay.clone())
522 .with_dht_service(dht_service.clone())
523 .build();
524
525 let router = Router::builder()
526 .route(dht_service.clone())
527 .route(overlay_service.clone())
528 .build();
529
530 let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
531
532 let network = Network::builder()
533 .with_config(base_config.network.clone())
534 .with_private_key(secret_key.to_bytes())
535 .with_remote_addr(public_addr)
536 .build(local_addr, router)
537 .context("failed to build node network")?;
538
539 let bootstrap_peer_count = dht_tasks.spawn(&network, bootstrap_peers)?;
540 overlay_tasks.spawn(&network);
541
542 let dht_client = dht_service.make_client(&network);
543 let peer_resolver = dht_service
544 .make_peer_resolver()
545 .with_config(base_config.peer_resolver.clone())
546 .build(&network);
547
548 tracing::info!(
549 %local_id,
550 %local_addr,
551 %public_addr,
552 bootstrap_peers = bootstrap_peer_count,
553 "initialized network"
554 );
555
556 Ok(Self {
557 keypair,
558 network,
559 dht_client,
560 peer_resolver,
561 overlay_service,
562 })
563 }
564
565 pub fn add_blockchain_rpc<BL, SL>(
566 &self,
567 zerostate: &ZerostateId,
568 storage: CoreStorage,
569 remote_broadcast_listener: BL,
570 self_broadcast_listener: SL,
571 base_config: &NodeBaseConfig,
572 ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
573 where
574 BL: BroadcastListener,
575 SL: SelfBroadcastListener,
576 {
577 let blockchain_rpc_service = BlockchainRpcService::builder()
578 .with_config(base_config.blockchain_rpc_service.clone())
579 .with_storage(storage)
580 .with_broadcast_listener(remote_broadcast_listener)
581 .build();
582
583 let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
584 .named("blockchain_rpc")
585 .with_peer_resolver(self.peer_resolver.clone())
586 .build(blockchain_rpc_service.clone());
587 self.overlay_service.add_public_overlay(&public_overlay);
588
589 let blockchain_rpc_client = BlockchainRpcClient::builder()
590 .with_config(base_config.blockchain_rpc_client.clone())
591 .with_public_overlay_client(PublicOverlayClient::new(
592 self.network.clone(),
593 public_overlay,
594 base_config.public_overlay_client.clone(),
595 ))
596 .with_self_broadcast_listener(self_broadcast_listener)
597 .build();
598
599 tracing::info!(
600 overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
601 "initialized blockchain rpc"
602 );
603
604 (blockchain_rpc_service, blockchain_rpc_client)
605 }
606}
607
608pub struct ConfiguredStorage {
609 pub context: StorageContext,
610 pub core_storage: CoreStorage,
611}
612
613impl ConfiguredStorage {
614 pub async fn new(
615 storage_config: &StorageConfig,
616 core_storage_config: &CoreStorageConfig,
617 ) -> Result<Self> {
618 let context = StorageContext::new(storage_config.clone())
619 .await
620 .context("failed to create storage context")?;
621 let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
622 .await
623 .context("failed to create storage")?;
624 tracing::info!(
625 root_dir = %core_storage.context().root_dir().path().display(),
626 "initialized storage"
627 );
628
629 Ok(Self {
630 context,
631 core_storage,
632 })
633 }
634}