1use std::net::SocketAddr;
2use std::path::PathBuf;
3use std::sync::Arc;
4
5use anyhow::{Context, Result};
6use tycho_crypto::ed25519;
7use tycho_network::{
8 DhtClient, DhtService, Network, OverlayService, PeerInfo, PeerResolver, PublicOverlay, Router,
9};
10use tycho_storage::{StorageConfig, StorageContext};
11use tycho_types::models::{BlockId, ValidatorSet};
12
13pub use self::config::NodeBaseConfig;
14pub use self::keys::NodeKeys;
15use crate::block_strider::{
16 ArchiveBlockProvider, BlockProvider, BlockStrider, BlockSubscriber, BlockchainBlockProvider,
17 ColdBootType, FileZerostateProvider, PersistentBlockStriderState, QueueStateHandler, Starter,
18 StorageBlockProvider,
19};
20use crate::blockchain_rpc::{
21 BlockchainRpcClient, BlockchainRpcService, BroadcastListener, SelfBroadcastListener,
22};
23use crate::global_config::{GlobalConfig, ZerostateId};
24use crate::overlay_client::{PublicOverlayClient, ValidatorsResolver};
25use crate::storage::{CoreStorage, CoreStorageConfig};
26
27mod config;
28mod keys;
29
30pub struct NodeBase {
31 pub base_config: NodeBaseConfig,
32 pub global_config: GlobalConfig,
33
34 pub keypair: Arc<ed25519::KeyPair>,
35 pub network: Network,
36 pub dht_client: DhtClient,
37 pub peer_resolver: PeerResolver,
38 pub overlay_service: OverlayService,
39
40 pub storage_context: StorageContext,
41 pub core_storage: CoreStorage,
42
43 pub blockchain_rpc_client: BlockchainRpcClient,
44}
45
46impl NodeBase {
47 pub fn builder<'a>(
48 base_config: &'a NodeBaseConfig,
49 global_config: &'a GlobalConfig,
50 ) -> NodeBaseBuilder<'a, ()> {
51 NodeBaseBuilder::new(base_config, global_config)
52 }
53
54 pub async fn init(
56 &self,
57 boot_type: ColdBootType,
58 import_zerostate: Option<Vec<PathBuf>>,
59 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
60 ) -> Result<BlockId> {
61 self.wait_for_neighbours(3).await;
62
63 let init_block_id = self
64 .boot(boot_type, import_zerostate, queue_state_handler)
65 .await
66 .context("failed to init node")?;
67
68 tracing::info!(%init_block_id, "node initialized");
69
70 Ok(init_block_id)
71 }
72
73 pub async fn wait_for_neighbours(&self, count: usize) {
75 tracing::info!("waiting for initial neighbours");
77 self.blockchain_rpc_client
78 .overlay_client()
79 .neighbours()
80 .wait_for_peers(count)
81 .await;
82 tracing::info!("found initial neighbours");
83 }
84
85 pub async fn boot(
87 &self,
88 boot_type: ColdBootType,
89 zerostates: Option<Vec<PathBuf>>,
90 queue_state_handler: Option<Box<dyn QueueStateHandler>>,
91 ) -> Result<BlockId> {
92 let node_state = self.core_storage.node_state();
93
94 let last_mc_block_id = match node_state.load_last_mc_block_id() {
95 Some(block_id) => block_id,
96 None => {
97 let mut starter = Starter::builder()
98 .with_storage(self.core_storage.clone())
99 .with_blockchain_rpc_client(self.blockchain_rpc_client.clone())
100 .with_zerostate_id(self.global_config.zerostate)
101 .with_config(self.base_config.starter.clone());
102
103 if let Some(handler) = queue_state_handler {
104 starter = starter.with_queue_state_handler(handler);
105 }
106
107 starter
108 .build()
109 .cold_boot(boot_type, zerostates.map(FileZerostateProvider))
110 .await?
111 }
112 };
113
114 let shard_state = self.core_storage.shard_state_storage();
115 Starter::init_allowed_workchains(shard_state, &last_mc_block_id).await?;
116
117 tracing::info!(
118 %last_mc_block_id,
119 "boot finished"
120 );
121
122 Ok(last_mc_block_id)
123 }
124
125 pub fn validator_resolver(&self) -> &ValidatorsResolver {
126 self.blockchain_rpc_client
127 .overlay_client()
128 .validators_resolver()
129 }
130
131 pub fn update_validator_set(&self, vset: &ValidatorSet) {
133 self.validator_resolver().update_validator_set(vset);
134 }
135
136 pub async fn update_validator_set_from_shard_state(&self, block_id: &BlockId) -> Result<()> {
138 let mc_state = self
140 .core_storage
141 .shard_state_storage()
142 .load_state(block_id.seqno, block_id)
143 .await?;
144
145 let config = mc_state.config_params()?;
146 let current_vset = config.get_current_validator_set()?;
147 self.update_validator_set(¤t_vset);
148 Ok(())
149 }
150
151 pub fn build_archive_block_provider(&self) -> ArchiveBlockProvider {
152 ArchiveBlockProvider::new(
153 self.blockchain_rpc_client.clone(),
154 self.core_storage.clone(),
155 self.base_config.archive_block_provider.clone(),
156 )
157 }
158
159 pub fn build_blockchain_block_provider(&self) -> BlockchainBlockProvider {
160 BlockchainBlockProvider::new(
161 self.blockchain_rpc_client.clone(),
162 self.core_storage.clone(),
163 self.base_config.blockchain_block_provider.clone(),
164 )
165 }
166
167 pub fn build_storage_block_provider(&self) -> StorageBlockProvider {
168 StorageBlockProvider::new(self.core_storage.clone())
169 }
170
171 pub fn build_strider<P, S>(
173 &self,
174 provider: P,
175 subscriber: S,
176 ) -> BlockStrider<PersistentBlockStriderState, P, S>
177 where
178 P: BlockProvider,
179 S: BlockSubscriber,
180 {
181 let state = PersistentBlockStriderState::new(
182 self.global_config.zerostate.as_block_id(),
183 self.core_storage.clone(),
184 );
185
186 BlockStrider::builder()
187 .with_state(state)
188 .with_provider(provider)
189 .with_block_subscriber(subscriber)
190 .build()
191 }
192}
193
194pub struct NodeBaseBuilder<'a, Step = ()> {
195 base_config: &'a NodeBaseConfig,
196 global_config: &'a GlobalConfig,
197 step: Step,
198}
199
200impl<'a> NodeBaseBuilder<'a, ()> {
201 pub fn new(base_config: &'a NodeBaseConfig, global_config: &'a GlobalConfig) -> Self {
202 Self {
203 base_config,
204 global_config,
205 step: (),
206 }
207 }
208
209 pub fn init_network(
210 self,
211 public_addr: SocketAddr,
212 secret_key: &ed25519::SecretKey,
213 ) -> Result<NodeBaseBuilder<'a, init::Step0>> {
214 let net = ConfiguredNetwork::new(
215 public_addr,
216 secret_key,
217 self.base_config,
218 &self.global_config.bootstrap_peers,
219 )?;
220
221 Ok(NodeBaseBuilder {
222 base_config: self.base_config,
223 global_config: self.global_config,
224 step: init::Step0 { net },
225 })
226 }
227}
228
229impl<'a> NodeBaseBuilder<'a, init::Step0> {
230 pub async fn init_storage(self) -> Result<NodeBaseBuilder<'a, init::Step1>> {
232 let store =
233 ConfiguredStorage::new(&self.base_config.storage, &self.base_config.core_storage)
234 .await?;
235
236 Ok(NodeBaseBuilder {
237 base_config: self.base_config,
238 global_config: self.global_config,
239 step: init::Step1 {
240 prev_step: self.step,
241 store,
242 },
243 })
244 }
245}
246
247impl<'a> NodeBaseBuilder<'a, init::Step1> {
248 pub fn init_blockchain_rpc<RL, SL>(
249 self,
250 remote_broadcast_listener: RL,
251 self_broadcast_listener: SL,
252 ) -> Result<NodeBaseBuilder<'a, init::Step2>>
253 where
254 RL: BroadcastListener,
255 SL: SelfBroadcastListener,
256 {
257 let (_, blockchain_rpc_client) = self.step.prev_step.net.add_blockchain_rpc(
258 &self.global_config.zerostate,
259 self.step.store.core_storage.clone(),
260 remote_broadcast_listener,
261 self_broadcast_listener,
262 self.base_config,
263 );
264
265 Ok(NodeBaseBuilder {
266 base_config: self.base_config,
267 global_config: self.global_config,
268 step: init::Step2 {
269 prev_step: self.step,
270 blockchain_rpc_client,
271 },
272 })
273 }
274}
275
276impl NodeBaseBuilder<'_, init::Final> {
277 pub fn build(self) -> Result<NodeBase> {
278 let net = self.step.prev_step.prev_step.net;
279 let store = self.step.prev_step.store;
280 let blockchain_rpc_client = self.step.blockchain_rpc_client;
281
282 Ok(NodeBase {
283 base_config: self.base_config.clone(),
284 global_config: self.global_config.clone(),
285 keypair: net.keypair,
286 network: net.network,
287 dht_client: net.dht_client,
288 peer_resolver: net.peer_resolver,
289 overlay_service: net.overlay_service,
290 storage_context: store.context,
291 core_storage: store.core_storage,
292 blockchain_rpc_client,
293 })
294 }
295}
296
297impl<'a, Step> NodeBaseBuilder<'a, Step> {
298 pub fn base_config(&self) -> &'a NodeBaseConfig {
299 self.base_config
300 }
301
302 pub fn global_config(&self) -> &'a GlobalConfig {
303 self.global_config
304 }
305}
306
307impl<Step: AsRef<init::Step0>> NodeBaseBuilder<'_, Step> {
308 pub fn keypair(&self) -> &Arc<ed25519::KeyPair> {
309 &self.step.as_ref().net.keypair
310 }
311
312 pub fn network(&self) -> &Network {
313 &self.step.as_ref().net.network
314 }
315
316 pub fn dht_client(&self) -> &DhtClient {
317 &self.step.as_ref().net.dht_client
318 }
319
320 pub fn peer_resolver(&self) -> &PeerResolver {
321 &self.step.as_ref().net.peer_resolver
322 }
323
324 pub fn overlay_service(&self) -> &OverlayService {
325 &self.step.as_ref().net.overlay_service
326 }
327}
328
329impl<Step: AsRef<init::Step1>> NodeBaseBuilder<'_, Step> {
330 pub fn storage_context(&self) -> &StorageContext {
331 &self.step.as_ref().store.context
332 }
333
334 pub fn core_storage(&self) -> &CoreStorage {
335 &self.step.as_ref().store.core_storage
336 }
337}
338
339impl<Step: AsRef<init::Step2>> NodeBaseBuilder<'_, Step> {
340 pub fn blockchain_rpc_client(&self) -> &BlockchainRpcClient {
341 &self.step.as_ref().blockchain_rpc_client
342 }
343}
344
345pub mod init {
346 use super::*;
347
348 pub type Final = Step2;
349
350 pub struct Step0 {
352 pub(super) net: ConfiguredNetwork,
353 }
354
355 impl AsRef<Step0> for Step0 {
356 #[inline]
357 fn as_ref(&self) -> &Step0 {
358 self
359 }
360 }
361
362 pub struct Step1 {
364 pub(super) prev_step: Step0,
365 pub(super) store: ConfiguredStorage,
366 }
367
368 impl AsRef<Step0> for Step1 {
369 #[inline]
370 fn as_ref(&self) -> &Step0 {
371 &self.prev_step
372 }
373 }
374
375 impl AsRef<Step1> for Step1 {
376 #[inline]
377 fn as_ref(&self) -> &Step1 {
378 self
379 }
380 }
381
382 pub struct Step2 {
384 pub(super) prev_step: Step1,
385 pub(super) blockchain_rpc_client: BlockchainRpcClient,
386 }
387
388 impl AsRef<Step0> for Step2 {
389 #[inline]
390 fn as_ref(&self) -> &Step0 {
391 &self.prev_step.prev_step
392 }
393 }
394
395 impl AsRef<Step1> for Step2 {
396 #[inline]
397 fn as_ref(&self) -> &Step1 {
398 &self.prev_step
399 }
400 }
401
402 impl AsRef<Step2> for Step2 {
403 #[inline]
404 fn as_ref(&self) -> &Step2 {
405 self
406 }
407 }
408}
409
410pub struct ConfiguredNetwork {
411 pub keypair: Arc<ed25519::KeyPair>,
412 pub network: Network,
413 pub dht_client: DhtClient,
414 pub peer_resolver: PeerResolver,
415 pub overlay_service: OverlayService,
416}
417
418impl ConfiguredNetwork {
419 pub fn new(
420 public_addr: SocketAddr,
421 secret_key: &ed25519::SecretKey,
422 base_config: &NodeBaseConfig,
423 bootstrap_peers: &[PeerInfo],
424 ) -> Result<Self> {
425 let keypair = Arc::new(ed25519::KeyPair::from(secret_key));
427 let local_id = keypair.public_key.into();
428
429 let (dht_tasks, dht_service) = DhtService::builder(local_id)
430 .with_config(base_config.dht.clone())
431 .build();
432
433 let (overlay_tasks, overlay_service) = OverlayService::builder(local_id)
434 .with_config(base_config.overlay.clone())
435 .with_dht_service(dht_service.clone())
436 .build();
437
438 let router = Router::builder()
439 .route(dht_service.clone())
440 .route(overlay_service.clone())
441 .build();
442
443 let local_addr = SocketAddr::from((base_config.local_ip, base_config.port));
444
445 let network = Network::builder()
446 .with_config(base_config.network.clone())
447 .with_private_key(secret_key.to_bytes())
448 .with_remote_addr(public_addr)
449 .build(local_addr, router)
450 .context("failed to build node network")?;
451
452 dht_tasks.spawn(&network);
453 overlay_tasks.spawn(&network);
454
455 let dht_client = dht_service.make_client(&network);
456 let peer_resolver = dht_service
457 .make_peer_resolver()
458 .with_config(base_config.peer_resolver.clone())
459 .build(&network);
460
461 let mut bootstrap_peer_count = 0usize;
462 for peer in bootstrap_peers {
463 let is_new = dht_client.add_peer(Arc::new(peer.clone()))?;
464 bootstrap_peer_count += is_new as usize;
465 }
466
467 tracing::info!(
468 %local_id,
469 %local_addr,
470 %public_addr,
471 bootstrap_peers = bootstrap_peer_count,
472 "initialized network"
473 );
474
475 Ok(Self {
476 keypair,
477 network,
478 dht_client,
479 peer_resolver,
480 overlay_service,
481 })
482 }
483
484 pub fn add_blockchain_rpc<BL, SL>(
485 &self,
486 zerostate: &ZerostateId,
487 storage: CoreStorage,
488 remote_broadcast_listener: BL,
489 self_broadcast_listener: SL,
490 base_config: &NodeBaseConfig,
491 ) -> (BlockchainRpcService<BL>, BlockchainRpcClient)
492 where
493 BL: BroadcastListener,
494 SL: SelfBroadcastListener,
495 {
496 let blockchain_rpc_service = BlockchainRpcService::builder()
497 .with_config(base_config.blockchain_rpc_service.clone())
498 .with_storage(storage)
499 .with_broadcast_listener(remote_broadcast_listener)
500 .build();
501
502 let public_overlay = PublicOverlay::builder(zerostate.compute_public_overlay_id())
503 .named("blockchain_rpc")
504 .with_peer_resolver(self.peer_resolver.clone())
505 .build(blockchain_rpc_service.clone());
506 self.overlay_service.add_public_overlay(&public_overlay);
507
508 let blockchain_rpc_client = BlockchainRpcClient::builder()
509 .with_config(base_config.blockchain_rpc_client.clone())
510 .with_public_overlay_client(PublicOverlayClient::new(
511 self.network.clone(),
512 public_overlay,
513 base_config.public_overlay_client.clone(),
514 ))
515 .with_self_broadcast_listener(self_broadcast_listener)
516 .build();
517
518 tracing::info!(
519 overlay_id = %blockchain_rpc_client.overlay().overlay_id(),
520 "initialized blockchain rpc"
521 );
522
523 (blockchain_rpc_service, blockchain_rpc_client)
524 }
525}
526
527pub struct ConfiguredStorage {
528 pub context: StorageContext,
529 pub core_storage: CoreStorage,
530}
531
532impl ConfiguredStorage {
533 pub async fn new(
534 storage_config: &StorageConfig,
535 core_storage_config: &CoreStorageConfig,
536 ) -> Result<Self> {
537 let context = StorageContext::new(storage_config.clone())
538 .await
539 .context("failed to create storage context")?;
540 let core_storage = CoreStorage::open(context.clone(), core_storage_config.clone())
541 .await
542 .context("failed to create storage")?;
543 tracing::info!(
544 root_dir = %core_storage.context().root_dir().path().display(),
545 "initialized storage"
546 );
547
548 Ok(Self {
549 context,
550 core_storage,
551 })
552 }
553}