1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8 DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13pub use self::config::NodeBaseConfig;
14pub use self::keys::NodeKeys;
15use crate::block_strider::{
16 ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
17 ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
18 StorageBlockProvider,
19};
20use crate::blockchain_rpc::{
21 BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
22};
23use crate::global_config::{GlobalConfig, ZerostateId};
24use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
25use crate::storage::{CoreStorage, CoreStorageConfig};
26
27mod config;
28mod keys;
29
30pub struct NodeBase {
31 pub base_config: NodeBaseConfig,
32 pub global_config: GlobalConfig,
33
34 pub keypair: Arc<ed25519::KeyPair>,
35 pub network: Network,
36 pub dht_client: DhtClient,
37 pub peer_resolver: PeerResolver,
38 pub overlay_service: OverlayService,
39
40 pub storage_context: StorageContext,
41 pub core_storage: CoreStorage,
42
43 pub blockchain_rpc_client: BlockchainRpcClient,
44}
45
46impl NodeBase {
47 pub fn builder<'a>(
48 base_config: &'a NodeBaseConfig,
49 global_config: &'a GlobalConfig,
50 ) -> NodeBaseBuilder<'a, ()> {
51 NodeBaseBuilder::new(base_config, global_config)
52 }
53
54 pub async fn init(
56 &self,
57 boot_type: ColdBootType,
58 import_zerostate: Option<Vec<PathBuf>>,
59 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
60 ) -> Result<BlockId> {
61 self.wait_for_neighbours(3).await;
62
63 let init_block_id = self
64 .boot(boot_type, import_zerostate, queue_state_handler)
65 .await
66 .context("failed to init node")?;
67
68 tracing::info!(%init_block_id, "node initialized");
69
70 Ok(init_block_id)
71 }
72
73 pub async fn wait_for_neighbours(&self, count: usize) {
75 tracing::info!("waiting for initial neighbours");
77 self.blockchain_rpc_client
78 .overlay_client()
79 .neighbours()
80 .wait_for_peers(count)
81 .await;
82 tracing::info!("found initial neighbours");
83 }
84
85 pub async fn boot(
87 &self,
88 boot_type: ColdBootType,
89 zerostates: Option<Vec<PathBuf>>,
90 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
91 ) -> Result<BlockId> {
92 let node_state = self.core_storage.node_state();
93
94 let last_mc_block_id = match node_state.load_last_mc_block_id() {
95 Some(block_id) => block_id,
96 None => {
97 let mut starter = Starter::builder()
98 .with_storage(self.core_storage.clone())
99 .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
100 .with_zerostate_id(self.global_config.zerostate)
101 .with_config(self.base_config.starter.clone());
102
103 if let Some(handler) = queue_state_handler {
104 starter = starter.with_queue_state_handler(handler);
105 }
106
107 starter
108 .build()
109 .cold_boot(boot_type, zerostates.map(FileZerostateProvider))
110 .await?
111 }
112 };
113
114 tracing::info!(
115 %last_mc_block_id,
116 "boot finished"
117 );
118
119 Ok(last_mc_block_id)
120 }
121
122 pub fn validator_resolver(&self) -> &ValidatorsResolver {
123 self.blockchain_rpc_client
124 .overlay_client()
125 .validators_resolver()
126 }
127
128 pub fn update_validator_set(&self, vset: &ValidatorSet) {
130 self.validator_resolver().update_validator_set(vset);
131 }
132
133 pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
135 let mc_state = self
137 .core_storage
138 .shard_state_storage()
139 .load_state(block_id.seqno, block_id)
140 .await?;
141
142 let config = mc_state.config_params()?;
143 let current_vset = config.get_current_validator_set()?;
144 self.update_validator_set(¤t_vset);
145 Ok(())
146 }
147
148 pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
149 ArchiveBlockProvider::new(
150 self.blockchain_rpc_client.clone(),
151 self.core_storage.clone(),
152 self.base_config.archive_block_provider.clone(),
153 )
154 }
155
156 pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
157 BlockchainBlockProvider::new(
158 self.blockchain_rpc_client.clone(),
159 self.core_storage.clone(),
160 self.base_config.blockchain_block_provider.clone(),
161 )
162 }
163
164 pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
165 StorageBlockProvider::new(self.core_storage.clone())
166 }
167
168 pub fn build_strider<P, S>(
170 &self,
171 provider: P,
172 subscriber: S,
173 ) -> BlockStrider<PersistentBlockStriderState, P, S>
174 where
175 P: BlockProvider,
176 S: BlockSubscriber,
177 {
178 let state = PersistentBlockStriderState::new(
179 self.global_config.zerostate.as_block_id(),
180 self.core_storage.clone(),
181 );
182
183 BlockStrider::builder()
184 .with_state(state)
185 .with_provider(provider)
186 .with_block_subscriber(subscriber)
187 .build()
188 }
189}
190
191pub struct NodeBaseBuilder<'a, Step = ()> {
192 base_config: &'a NodeBaseConfig,
193 global_config: &'a GlobalConfig,
194 step: Step,
195}
196
197impl<'a> NodeBaseBuilder<'a, ()> {
198 pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
199 Self {
200 base_config,
201 global_config,
202 step: (),
203 }
204 }
205
206 pub fn init_network(
207 self,
208 public_addr: SocketAddr,
209 secret_key: &ed25519::SecretKey,
210 ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
211 let net = ConfiguredNetwork::new(
212 public_addr,
213 secret_key,
214 self.base_config,
215 &self.global_config.bootstrap_peers,
216 )?;
217
218 Ok(NodeBaseBuilder {
219 base_config: self.base_config,
220 global_config: self.global_config,
221 step: init::Step0 { net },
222 })
223 }
224}
225
226impl<'a> NodeBaseBuilder<'a, init::Step0> {
227 pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
229 let store =
230 ConfiguredStorage::new(&self.base_config.storage, &self.base_config.core_storage)
231 .await?;
232
233 Ok(NodeBaseBuilder {
234 base_config: self.base_config,
235 global_config: self.global_config,
236 step: init::Step1 {
237 prev_step: self.step,
238 store,
239 },
240 })
241 }
242}
243
244impl<'a> NodeBaseBuilder<'a, init::Step1> {
245 pub fn init_blockchain_rpc<RL, SL>(
246 self,
247 remote_broadcast_listener: RL,
248 self_broadcast_listener: SL,
249 ) -> Result<NodeBaseBuilder<'a, init::Step2>>
250 where
251 RL: BroadcastListener,
252 SL: SelfBroadcastListener,
253 {
254 let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
255 &self.global_config.zerostate,
256 self.step.store.core_storage.clone(),
257 remote_broadcast_listener,
258 self_broadcast_listener,
259 self.base_config,
260 );
261
262 Ok(NodeBaseBuilder {
263 base_config: self.base_config,
264 global_config: self.global_config,
265 step: init::Step2 {
266 prev_step: self.step,
267 blockchain_rpc_client,
268 },
269 })
270 }
271}
272
273impl NodeBaseBuilder<'_, init::Final> {
274 pub fn build(self) -> Result<NodeBase> {
275 let net = self.step.prev_step.prev_step.net;
276 let store = self.step.prev_step.store;
277 let blockchain_rpc_client = self.step.blockchain_rpc_client;
278
279 Ok(NodeBase {
280 base_config: self.base_config.clone(),
281 global_config: self.global_config.clone(),
282 keypair: net.keypair,
283 network: net.network,
284 dht_client: net.dht_client,
285 peer_resolver: net.peer_resolver,
286 overlay_service: net.overlay_service,
287 storage_context: store.context,
288 core_storage: store.core_storage,
289 blockchain_rpc_client,
290 })
291 }
292}
293
294impl<'a, Step> NodeBaseBuilder<'a, Step> {
295 pub fn base_config(&self) -> &'a NodeBaseConfig {
296 self.base_config
297 }
298
299 pub fn global_config(&self) -> &'a GlobalConfig {
300 self.global_config
301 }
302}
303
304impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
305 pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
306 &self.step.as_ref().net.keypair
307 }
308
309 pub fn network(&self) -> &Network {
310 &self.step.as_ref().net.network
311 }
312
313 pub fn dht_client(&self) -> &DhtClient {
314 &self.step.as_ref().net.dht_client
315 }
316
317 pub fn peer_resolver(&self) -> &PeerResolver {
318 &self.step.as_ref().net.peer_resolver
319 }
320
321 pub fn overlay_service(&self) -> &OverlayService {
322 &self.step.as_ref().net.overlay_service
323 }
324}
325
326impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
327 pub fn storage_context(&self) -> &StorageContext {
328 &self.step.as_ref().store.context
329 }
330
331 pub fn core_storage(&self) -> &CoreStorage {
332 &self.step.as_ref().store.core_storage
333 }
334}
335
336impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
337 pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
338 &self.step.as_ref().blockchain_rpc_client
339 }
340}
341
342pub mod init {
343 use super::*;
344
345 pub type Final = Step2;
346
347 pub struct Step0 {
349 pub(super) net: ConfiguredNetwork,
350 }
351
352 impl AsRef<Step0> for Step0 {
353 #[inline]
354 fn as_ref(&self) -> &Step0 {
355 self
356 }
357 }
358
359 pub struct Step1 {
361 pub(super) prev_step: Step0,
362 pub(super) store: ConfiguredStorage,
363 }
364
365 impl AsRef<Step0> for Step1 {
366 #[inline]
367 fn as_ref(&self) -> &Step0 {
368 &self.prev_step
369 }
370 }
371
372 impl AsRef<Step1> for Step1 {
373 #[inline]
374 fn as_ref(&self) -> &Step1 {
375 self
376 }
377 }
378
379 pub struct Step2 {
381 pub(super) prev_step: Step1,
382 pub(super) blockchain_rpc_client: BlockchainRpcClient,
383 }
384
385 impl AsRef<Step0> for Step2 {
386 #[inline]
387 fn as_ref(&self) -> &Step0 {
388 &self.prev_step.prev_step
389 }
390 }
391
392 impl AsRef<Step1> for Step2 {
393 #[inline]
394 fn as_ref(&self) -> &Step1 {
395 &self.prev_step
396 }
397 }
398
399 impl AsRef<Step2> for Step2 {
400 #[inline]
401 fn as_ref(&self) -> &Step2 {
402 self
403 }
404 }
405}
406
407pub struct ConfiguredNetwork {
408 pub keypair: Arc<ed25519::KeyPair>,
409 pub network: Network,
410 pub dht_client: DhtClient,
411 pub peer_resolver: PeerResolver,
412 pub overlay_service: OverlayService,
413}
414
415impl ConfiguredNetwork {
416 pub fn new(
417 public_addr: SocketAddr,
418 secret_key: &ed25519::SecretKey,
419 base_config: &NodeBaseConfig,
420 bootstrap_peers: &[PeerInfo],
421 ) -> Result<Self> {
422 let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
424 let local_id = keypair.public_key.into();
425
426 let (dht_tasks, dht_service) = DhtService::builder(local_id)
427 .with_config(base_config.dht.clone())
428 .build();
429
430 let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
431 .with_config(base_config.overlay.clone())
432 .with_dht_service(dht_service.clone())
433 .build();
434
435 let router = Router::builder()
436 .route(dht_service.clone())
437 .route(overlay_service.clone())
438 .build();
439
440 let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
441
442 let network = Network::builder()
443 .with_config(base_config.network.clone())
444 .with_private_key(secret_key.to_bytes())
445 .with_remote_addr(public_addr)
446 .build(local_addr, router)
447 .context("failed to build node network")?;
448
449 dht_tasks.spawn(&network);
450 overlay_tasks.spawn(&network);
451
452 let dht_client = dht_service.make_client(&network);
453 let peer_resolver = dht_service
454 .make_peer_resolver()
455 .with_config(base_config.peer_resolver.clone())
456 .build(&network);
457
458 let mut bootstrap_peer_count = 0usize;
459 for peer in bootstrap_peers {
460 let is_new = dht_client.add_peer(Arc::new(peer.clone()))?;
461 bootstrap_peer_count += is_new as usize;
462 }
463
464 tracing::info!(
465 %local_id,
466 %local_addr,
467 %public_addr,
468 bootstrap_peers = bootstrap_peer_count,
469 "initialized network"
470 );
471
472 Ok(Self {
473 keypair,
474 network,
475 dht_client,
476 peer_resolver,
477 overlay_service,
478 })
479 }
480
481 pub fn add_blockchain_rpc<BL, SL>(
482 &self,
483 zerostate: &ZerostateId,
484 storage: CoreStorage,
485 remote_broadcast_listener: BL,
486 self_broadcast_listener: SL,
487 base_config: &NodeBaseConfig,
488 ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
489 where
490 BL: BroadcastListener,
491 SL: SelfBroadcastListener,
492 {
493 let blockchain_rpc_service = BlockchainRpcService::builder()
494 .with_config(base_config.blockchain_rpc_service.clone())
495 .with_storage(storage)
496 .with_broadcast_listener(remote_broadcast_listener)
497 .build();
498
499 let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
500 .named("blockchain_rpc")
501 .with_peer_resolver(self.peer_resolver.clone())
502 .build(blockchain_rpc_service.clone());
503 self.overlay_service.add_public_overlay(&public_overlay);
504
505 let blockchain_rpc_client = BlockchainRpcClient::builder()
506 .with_config(base_config.blockchain_rpc_client.clone())
507 .with_public_overlay_client(PublicOverlayClient::new(
508 self.network.clone(),
509 public_overlay,
510 base_config.public_overlay_client.clone(),
511 ))
512 .with_self_broadcast_listener(self_broadcast_listener)
513 .build();
514
515 tracing::info!(
516 overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
517 "initialized blockchain rpc"
518 );
519
520 (blockchain_rpc_service, blockchain_rpc_client)
521 }
522}
523
524pub struct ConfiguredStorage {
525 pub context: StorageContext,
526 pub core_storage: CoreStorage,
527}
528
529impl ConfiguredStorage {
530 pub async fn new(
531 storage_config: &StorageConfig,
532 core_storage_config: &CoreStorageConfig,
533 ) -> Result<Self> {
534 let context = StorageContext::new(storage_config.clone())
535 .await
536 .context("failed to create storage context")?;
537 let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
538 .await
539 .context("failed to create storage")?;
540 tracing::info!(
541 root_dir = %core_storage.context().root_dir().path().display(),
542 "initialized storage"
543 );
544
545 Ok(Self {
546 context,
547 core_storage,
548 })
549 }
550}