1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8 DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13pub use self::config::NodeBaseConfig;
14pub use self::keys::NodeKeys;
15use crate::block_strider::{
16 ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
17 ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
18 StorageBlockProvider,
19};
20use crate::blockchain_rpc::{
21 BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
22};
23use crate::global_config::{GlobalConfig, ZerostateId};
24use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
25#[cfg(feature = "s3")]
26use crate::s3::S3Client;
27use crate::storage::{CoreStorage, CoreStorageConfig};
28
29mod config;
30mod keys;
31
32pub struct NodeBase {
33 pub base_config: NodeBaseConfig,
34 pub global_config: GlobalConfig,
35
36 pub keypair: Arc<ed25519::KeyPair>,
37 pub network: Network,
38 pub dht_client: DhtClient,
39 pub peer_resolver: PeerResolver,
40 pub overlay_service: OverlayService,
41
42 pub storage_context: StorageContext,
43 pub core_storage: CoreStorage,
44
45 pub blockchain_rpc_client: BlockchainRpcClient,
46
47 #[cfg(feature = "s3")]
48 pub s3_client: Option<S3Client>,
49}
50
51impl NodeBase {
52 pub fn builder<'a>(
53 base_config: &'a NodeBaseConfig,
54 global_config: &'a GlobalConfig,
55 ) -> NodeBaseBuilder<'a, ()> {
56 crate::record_version_metric();
57 NodeBaseBuilder::new(base_config, global_config)
58 }
59
60 pub async fn init(
62 &self,
63 boot_type: ColdBootType,
64 import_zerostate: Option<Vec<PathBuf>>,
65 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
66 ) -> Result<BlockId> {
67 self.wait_for_neighbours(3).await;
68
69 let init_block_id = self
70 .boot(boot_type, import_zerostate, queue_state_handler)
71 .await
72 .context("failed to init node")?;
73
74 tracing::info!(%init_block_id, "node initialized");
75
76 Ok(init_block_id)
77 }
78
79 pub async fn wait_for_neighbours(&self, count: usize) {
81 tracing::info!("waiting for initial neighbours");
83 self.blockchain_rpc_client
84 .overlay_client()
85 .neighbours()
86 .wait_for_peers(count)
87 .await;
88 tracing::info!("found initial neighbours");
89 }
90
91 pub async fn boot(
93 &self,
94 boot_type: ColdBootType,
95 zerostates: Option<Vec<PathBuf>>,
96 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
97 ) -> Result<BlockId> {
98 let node_state = self.core_storage.node_state();
99
100 let last_mc_block_id = match node_state.load_last_mc_block_id() {
101 Some(block_id) => block_id,
102 None => {
103 let mut starter = Starter::builder()
104 .with_storage(self.core_storage.clone())
105 .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
106 .with_zerostate_id(self.global_config.zerostate)
107 .with_config(self.base_config.starter.clone());
108
109 if let Some(handler) = queue_state_handler {
110 starter = starter.with_queue_state_handler(handler);
111 }
112
113 starter
114 .build()
115 .cold_boot(boot_type, zerostates.map(FileZerostateProvider))
116 .await?
117 }
118 };
119
120 tracing::info!(
121 %last_mc_block_id,
122 "boot finished"
123 );
124
125 Ok(last_mc_block_id)
126 }
127
128 pub fn validator_resolver(&self) -> &ValidatorsResolver {
129 self.blockchain_rpc_client
130 .overlay_client()
131 .validators_resolver()
132 }
133
134 pub fn update_validator_set(&self, vset: &ValidatorSet) {
136 self.validator_resolver().update_validator_set(vset);
137 }
138
139 pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
141 let mc_state = self
143 .core_storage
144 .shard_state_storage()
145 .load_state(block_id.seqno, block_id)
146 .await?;
147
148 let config = mc_state.config_params()?;
149 let current_vset = config.get_current_validator_set()?;
150 self.update_validator_set(¤t_vset);
151 Ok(())
152 }
153
154 pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
155 ArchiveBlockProvider::new(
156 (
157 self.blockchain_rpc_client.clone(),
158 #[cfg(feature = "s3")]
159 self.s3_client.clone(),
160 ),
161 self.core_storage.clone(),
162 self.base_config.archive_block_provider.clone(),
163 )
164 }
165
166 pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
167 BlockchainBlockProvider::new(
168 self.blockchain_rpc_client.clone(),
169 self.core_storage.clone(),
170 self.base_config.blockchain_block_provider.clone(),
171 )
172 }
173
174 pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
175 StorageBlockProvider::new(self.core_storage.clone())
176 }
177
178 pub fn build_strider<P, S>(
180 &self,
181 provider: P,
182 subscriber: S,
183 ) -> BlockStrider<PersistentBlockStriderState, P, S>
184 where
185 P: BlockProvider,
186 S: BlockSubscriber,
187 {
188 let state = PersistentBlockStriderState::new(
189 self.global_config.zerostate.as_block_id(),
190 self.core_storage.clone(),
191 );
192
193 BlockStrider::builder()
194 .with_state(state)
195 .with_provider(provider)
196 .with_block_subscriber(subscriber)
197 .build()
198 }
199}
200
201pub struct NodeBaseBuilder<'a, Step = ()> {
202 base_config: &'a NodeBaseConfig,
203 global_config: &'a GlobalConfig,
204 step: Step,
205}
206
207impl<'a> NodeBaseBuilder<'a, ()> {
208 pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
209 Self {
210 base_config,
211 global_config,
212 step: (),
213 }
214 }
215
216 pub fn init_network(
217 self,
218 public_addr: SocketAddr,
219 secret_key: &ed25519::SecretKey,
220 ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
221 let net = ConfiguredNetwork::new(
222 public_addr,
223 secret_key,
224 self.base_config,
225 &self.global_config.bootstrap_peers,
226 )?;
227
228 Ok(NodeBaseBuilder {
229 base_config: self.base_config,
230 global_config: self.global_config,
231 step: init::Step0 { net },
232 })
233 }
234}
235
236impl<'a> NodeBaseBuilder<'a, init::Step0> {
237 pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
239 let store =
240 ConfiguredStorage::new(&self.base_config.storage, &self.base_config.core_storage)
241 .await?;
242
243 Ok(NodeBaseBuilder {
244 base_config: self.base_config,
245 global_config: self.global_config,
246 step: init::Step1 {
247 prev_step: self.step,
248 store,
249 },
250 })
251 }
252}
253
254impl<'a> NodeBaseBuilder<'a, init::Step1> {
255 pub fn init_blockchain_rpc<RL, SL>(
256 self,
257 remote_broadcast_listener: RL,
258 self_broadcast_listener: SL,
259 ) -> Result<NodeBaseBuilder<'a, init::Step2>>
260 where
261 RL: BroadcastListener,
262 SL: SelfBroadcastListener,
263 {
264 let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
265 &self.global_config.zerostate,
266 self.step.store.core_storage.clone(),
267 remote_broadcast_listener,
268 self_broadcast_listener,
269 self.base_config,
270 );
271
272 Ok(NodeBaseBuilder {
273 base_config: self.base_config,
274 global_config: self.global_config,
275 step: init::Step2 {
276 prev_step: self.step,
277 blockchain_rpc_client,
278 },
279 })
280 }
281}
282
283impl<'a> NodeBaseBuilder<'a, init::Final> {
284 pub fn build(self) -> Result<NodeBase> {
285 let net = self.step.prev_step.prev_step.net;
286 let store = self.step.prev_step.store;
287 let blockchain_rpc_client = self.step.blockchain_rpc_client;
288
289 Ok(NodeBase {
290 base_config: self.base_config.clone(),
291 global_config: self.global_config.clone(),
292 keypair: net.keypair,
293 network: net.network,
294 dht_client: net.dht_client,
295 peer_resolver: net.peer_resolver,
296 overlay_service: net.overlay_service,
297 storage_context: store.context,
298 core_storage: store.core_storage,
299 blockchain_rpc_client,
300 #[cfg(feature = "s3")]
301 s3_client: self
302 .base_config
303 .s3_client
304 .as_ref()
305 .map(S3Client::new)
306 .transpose()
307 .context("failed to create S3 client")?,
308 })
309 }
310}
311
312impl<'a, Step> NodeBaseBuilder<'a, Step> {
313 pub fn base_config(&self) -> &'a NodeBaseConfig {
314 self.base_config
315 }
316
317 pub fn global_config(&self) -> &'a GlobalConfig {
318 self.global_config
319 }
320}
321
322impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
323 pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
324 &self.step.as_ref().net.keypair
325 }
326
327 pub fn network(&self) -> &Network {
328 &self.step.as_ref().net.network
329 }
330
331 pub fn dht_client(&self) -> &DhtClient {
332 &self.step.as_ref().net.dht_client
333 }
334
335 pub fn peer_resolver(&self) -> &PeerResolver {
336 &self.step.as_ref().net.peer_resolver
337 }
338
339 pub fn overlay_service(&self) -> &OverlayService {
340 &self.step.as_ref().net.overlay_service
341 }
342}
343
344impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
345 pub fn storage_context(&self) -> &StorageContext {
346 &self.step.as_ref().store.context
347 }
348
349 pub fn core_storage(&self) -> &CoreStorage {
350 &self.step.as_ref().store.core_storage
351 }
352}
353
354impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
355 pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
356 &self.step.as_ref().blockchain_rpc_client
357 }
358}
359
360pub mod init {
361 use super::*;
362
363 pub type Final = Step2;
364
365 pub struct Step0 {
367 pub(super) net: ConfiguredNetwork,
368 }
369
370 impl AsRef<Step0> for Step0 {
371 #[inline]
372 fn as_ref(&self) -> &Step0 {
373 self
374 }
375 }
376
377 pub struct Step1 {
379 pub(super) prev_step: Step0,
380 pub(super) store: ConfiguredStorage,
381 }
382
383 impl AsRef<Step0> for Step1 {
384 #[inline]
385 fn as_ref(&self) -> &Step0 {
386 &self.prev_step
387 }
388 }
389
390 impl AsRef<Step1> for Step1 {
391 #[inline]
392 fn as_ref(&self) -> &Step1 {
393 self
394 }
395 }
396
397 pub struct Step2 {
399 pub(super) prev_step: Step1,
400 pub(super) blockchain_rpc_client: BlockchainRpcClient,
401 }
402
403 impl AsRef<Step0> for Step2 {
404 #[inline]
405 fn as_ref(&self) -> &Step0 {
406 &self.prev_step.prev_step
407 }
408 }
409
410 impl AsRef<Step1> for Step2 {
411 #[inline]
412 fn as_ref(&self) -> &Step1 {
413 &self.prev_step
414 }
415 }
416
417 impl AsRef<Step2> for Step2 {
418 #[inline]
419 fn as_ref(&self) -> &Step2 {
420 self
421 }
422 }
423}
424
425pub struct ConfiguredNetwork {
426 pub keypair: Arc<ed25519::KeyPair>,
427 pub network: Network,
428 pub dht_client: DhtClient,
429 pub peer_resolver: PeerResolver,
430 pub overlay_service: OverlayService,
431}
432
433impl ConfiguredNetwork {
434 pub fn new(
435 public_addr: SocketAddr,
436 secret_key: &ed25519::SecretKey,
437 base_config: &NodeBaseConfig,
438 bootstrap_peers: &[PeerInfo],
439 ) -> Result<Self> {
440 let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
442 let local_id = keypair.public_key.into();
443
444 let (dht_tasks, dht_service) = DhtService::builder(local_id)
445 .with_config(base_config.dht.clone())
446 .build();
447
448 let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
449 .with_config(base_config.overlay.clone())
450 .with_dht_service(dht_service.clone())
451 .build();
452
453 let router = Router::builder()
454 .route(dht_service.clone())
455 .route(overlay_service.clone())
456 .build();
457
458 let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
459
460 let network = Network::builder()
461 .with_config(base_config.network.clone())
462 .with_private_key(secret_key.to_bytes())
463 .with_remote_addr(public_addr)
464 .build(local_addr, router)
465 .context("failed to build node network")?;
466
467 dht_tasks.spawn(&network);
468 overlay_tasks.spawn(&network);
469
470 let dht_client = dht_service.make_client(&network);
471 let peer_resolver = dht_service
472 .make_peer_resolver()
473 .with_config(base_config.peer_resolver.clone())
474 .build(&network);
475
476 let mut bootstrap_peer_count = 0usize;
477 for peer in bootstrap_peers {
478 let is_new = dht_client.add_peer(Arc::new(peer.clone()))?;
479 bootstrap_peer_count += is_new as usize;
480 }
481
482 tracing::info!(
483 %local_id,
484 %local_addr,
485 %public_addr,
486 bootstrap_peers = bootstrap_peer_count,
487 "initialized network"
488 );
489
490 Ok(Self {
491 keypair,
492 network,
493 dht_client,
494 peer_resolver,
495 overlay_service,
496 })
497 }
498
499 pub fn add_blockchain_rpc<BL, SL>(
500 &self,
501 zerostate: &ZerostateId,
502 storage: CoreStorage,
503 remote_broadcast_listener: BL,
504 self_broadcast_listener: SL,
505 base_config: &NodeBaseConfig,
506 ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
507 where
508 BL: BroadcastListener,
509 SL: SelfBroadcastListener,
510 {
511 let blockchain_rpc_service = BlockchainRpcService::builder()
512 .with_config(base_config.blockchain_rpc_service.clone())
513 .with_storage(storage)
514 .with_broadcast_listener(remote_broadcast_listener)
515 .build();
516
517 let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
518 .named("blockchain_rpc")
519 .with_peer_resolver(self.peer_resolver.clone())
520 .build(blockchain_rpc_service.clone());
521 self.overlay_service.add_public_overlay(&public_overlay);
522
523 let blockchain_rpc_client = BlockchainRpcClient::builder()
524 .with_config(base_config.blockchain_rpc_client.clone())
525 .with_public_overlay_client(PublicOverlayClient::new(
526 self.network.clone(),
527 public_overlay,
528 base_config.public_overlay_client.clone(),
529 ))
530 .with_self_broadcast_listener(self_broadcast_listener)
531 .build();
532
533 tracing::info!(
534 overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
535 "initialized blockchain rpc"
536 );
537
538 (blockchain_rpc_service, blockchain_rpc_client)
539 }
540}
541
542pub struct ConfiguredStorage {
543 pub context: StorageContext,
544 pub core_storage: CoreStorage,
545}
546
547impl ConfiguredStorage {
548 pub async fn new(
549 storage_config: &StorageConfig,
550 core_storage_config: &CoreStorageConfig,
551 ) -> Result<Self> {
552 let context = StorageContext::new(storage_config.clone())
553 .await
554 .context("failed to create storage context")?;
555 let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
556 .await
557 .context("failed to create storage")?;
558 tracing::info!(
559 root_dir = %core_storage.context().root_dir().path().display(),
560 "initialized storage"
561 );
562
563 Ok(Self {
564 context,
565 core_storage,
566 })
567 }
568}