1use crate::node::anr::{Anr, NodeAnrs};
2use crate::node::peers::{HandshakeStatus, PeersSummary};
3use crate::node::protocol::*;
4use crate::node::protocol::{Catchup, CatchupHeight, Instruction, NewPhoneWhoDis, NewPhoneWhoDisReply};
5use crate::node::{anr, peers};
6use crate::socket::UdpSocketExt;
7use crate::utils::misc::Typename;
8use crate::utils::misc::{format_duration, get_unix_millis_now};
9use crate::{SystemStats, Ver, config, consensus, get_system_stats, metrics, node, utils};
10use serde::{Deserialize, Serialize};
11use serde_json::Value;
12use std::net::{Ipv4Addr, SocketAddr};
13use std::sync::Arc;
14use tracing::{debug, info, instrument, warn};
15
16#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
18#[serde(rename_all = "lowercase")]
19pub enum SoftforkStatus {
20 #[serde(rename = "")]
22 Healthy,
23 Minor,
25 Major,
27}
28
29#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
30pub enum Error {
31 #[error(transparent)]
32 Io(#[from] std::io::Error),
33 #[error(transparent)]
34 Fabric(#[from] consensus::fabric::Error),
35 #[error(transparent)]
36 Archiver(#[from] utils::archiver::Error),
37 #[error(transparent)]
38 Protocol(#[from] node::protocol::Error),
39 #[error(transparent)]
40 Config(#[from] config::Error),
41 #[error(transparent)]
42 Anr(#[from] anr::Error),
43 #[error(transparent)]
44 Peers(#[from] peers::Error),
45 #[error("{0}")]
46 String(String),
47}
48
49impl Typename for Error {
50 fn typename(&self) -> &'static str {
51 self.into()
52 }
53}
54
55pub struct Context {
57 pub(crate) config: config::Config,
58 pub(crate) metrics: metrics::Metrics,
59 pub(crate) reassembler: node::ReedSolomonReassembler,
60 pub(crate) node_peers: peers::NodePeers,
61 pub(crate) node_anrs: NodeAnrs,
62 pub(crate) fabric: crate::consensus::fabric::Fabric,
63 pub(crate) socket: Arc<dyn UdpSocketExt>,
64}
65
66impl Context {
67 pub async fn with_config_and_socket(
68 config: config::Config,
69 socket: Arc<dyn UdpSocketExt>,
70 ) -> Result<Arc<Self>, Error> {
71 use crate::config::{
72 ANR_PERIOD_MILLIS, BROADCAST_PERIOD_MILLIS, CATCHUP_PERIOD_MILLIS, CLEANUP_PERIOD_MILLIS,
73 CONSENSUS_PERIOD_MILLIS,
74 };
75 use crate::consensus::fabric::Fabric;
76 use crate::utils::archiver::init_storage;
77 use metrics::Metrics;
78 use node::ReedSolomonReassembler;
79 use tokio::time::{Duration, interval};
80
81 assert_ne!(config.get_root(), "");
82 init_storage(&config.get_root()).await?;
83
84 let fabric = Fabric::new(&config.get_root()).await?;
85
86 {
87 if let Some(rooted_hash) = fabric.get_rooted_hash()? {
90 if let Some(rooted_entry) = fabric.get_entry_by_hash(&rooted_hash) {
91 fabric.set_rooted(&rooted_entry)?;
92
93 if let Some(temporal_hash) = fabric.get_temporal_hash()? {
96 if let Some(temporal_entry) = fabric.get_entry_by_hash(&temporal_hash) {
97 fabric.set_temporal(&temporal_entry)?;
98 } else {
99 fabric.set_temporal(&rooted_entry)?;
101 }
102 } else {
103 fabric.set_temporal(&rooted_entry)?;
105 }
106 }
107 }
108 }
109
110 let metrics = Metrics::new();
111 let node_peers = peers::NodePeers::default();
112 let node_anrs = NodeAnrs::new();
113 let reassembler = ReedSolomonReassembler::new();
114
115 node_anrs.seed(&config).await; node_peers.seed(&fabric, &config, &node_anrs).await?;
117
118 let ctx = Arc::new(Self { config, metrics, reassembler, node_peers, node_anrs, fabric, socket });
119
120 tokio::spawn({
121 let ctx = ctx.clone();
122 async move {
123 if let Err(e) = ctx.bootstrap_task().await {
124 warn!("bootstrap task error: {e}");
125 ctx.metrics.add_error(&e);
126 }
127 }
128 });
129
130 tokio::spawn({
131 let ctx = ctx.clone();
132 async move {
133 let mut ticker = interval(Duration::from_millis(CLEANUP_PERIOD_MILLIS));
134 loop {
135 ticker.tick().await;
136 ctx.cleanup_task(CLEANUP_PERIOD_MILLIS / 1000).await;
137 }
138 }
139 });
140
141 tokio::spawn({
142 let ctx = ctx.clone();
143 async move {
144 let mut ticker = interval(Duration::from_millis(ANR_PERIOD_MILLIS));
145 ticker.tick().await;
146 loop {
147 ticker.tick().await;
148 if let Err(e) = ctx.anr_task().await {
149 warn!("anr task error: {e}");
150 ctx.metrics.add_error(&e);
151 }
152 }
153 }
154 });
155
156 tokio::spawn({
157 let ctx = ctx.clone();
158 async move {
159 let mut ticker = interval(Duration::from_millis(BROADCAST_PERIOD_MILLIS));
160 loop {
161 ticker.tick().await;
162 if let Err(e) = ctx.broadcast_task().await {
163 warn!("broadcast task error: {e}");
165 }
167 }
168 }
169 });
170
171 tokio::spawn({
172 let ctx = ctx.clone();
173 async move {
174 let mut ticker = interval(Duration::from_millis(CONSENSUS_PERIOD_MILLIS));
175 loop {
176 ticker.tick().await;
177 if let Err(e) = ctx.consensus_task().await {
178 warn!("consensus task error: {e}");
179 }
181 }
182 }
183 });
184
185 tokio::spawn({
186 let ctx = ctx.clone();
187 async move {
188 let mut ticker = interval(Duration::from_millis(CATCHUP_PERIOD_MILLIS));
189 loop {
190 ticker.tick().await;
191 if let Err(e) = ctx.catchup_task().await {
192 warn!("catchup task error: {e}");
193 }
195 }
196 }
197 });
198
199 Ok(ctx)
200 }
201
202 #[instrument(skip(self), name = "bootstrap_task")]
203 async fn bootstrap_task(&self) -> Result<(), Error> {
204 let new_phone_who_dis = NewPhoneWhoDis::new();
205
206 for ip in &self.config.seed_ips {
207 match new_phone_who_dis.send_to_with_metrics(self, *ip).await {
209 Ok(_) => {
210 debug!("sent encrypted new_phone_who_dis to seed {ip}");
211 }
212 Err(e) => {
213 warn!("failed to send encrypted new_phone_who_dis to seed {ip}: {e}");
215 }
216 }
217 self.node_peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
219 }
220
221 info!("sent new_phone_who_dis to {} seed nodes", self.config.seed_ips.len());
222 Ok(())
223 }
224
225 #[instrument(skip(self), name = "cleanup_task")]
226 async fn cleanup_task(&self, cleanup_secs: u64) {
227 let cleared_shards = self.reassembler.clear_stale(cleanup_secs).await;
228 let cleared_peers = self.node_peers.clear_stale(&self.fabric, &self.node_anrs).await;
229 if cleared_shards > 0 || cleared_peers > 0 {
230 debug!("cleared {} stale shards, {} stale peers", cleared_shards, cleared_peers);
231 }
232 self.fabric.cleanup().await;
233 }
234
235 #[instrument(skip(self), name = "anr_task")]
236 async fn anr_task(&self) -> Result<(), Error> {
237 let unverified_ips = self.node_anrs.get_random_not_verified(3).await;
238 if !unverified_ips.is_empty() {
239 let new_phone_who_dis = NewPhoneWhoDis::new();
240 for ip in &unverified_ips {
241 new_phone_who_dis.send_to_with_metrics(self, *ip).await?;
242 self.node_peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
243 }
244 }
245
246 let verified_ips = self.node_anrs.get_random_verified(3).await;
247 if !verified_ips.is_empty() {
248 let get_peer_anrs = GetPeerAnrs { has_peers_b3f4: self.node_anrs.get_all_b3f4().await };
249 for ip in &verified_ips {
250 self.send_message_to(&get_peer_anrs, *ip).await?;
251 }
252 }
253
254 info!("sent new_phone_who_dis to {} and get_peer_anrs to {} nodes", unverified_ips.len(), verified_ips.len());
255
256 Ok(())
257 }
258
259 #[instrument(skip(self), name = "broadcast_task")]
260 async fn broadcast_task(&self) -> Result<(), Error> {
261 let ping = Ping::new();
262 let tip = EventTip::from_current_tips_db(&self.fabric)?;
263
264 let my_ip = self.config.get_public_ipv4();
265 let peers = self.node_peers.get_all().await?;
266 if !peers.is_empty() {
267 let mut sent_count = 0;
268 for peer in peers {
269 if peer.ip != my_ip {
270 self.send_message_to(&ping, peer.ip).await?;
271 self.send_message_to(&tip, peer.ip).await?;
272 sent_count += 1;
273 }
274 }
275
276 debug!("sent {sent_count} ping and tip messages");
277 }
278
279 Ok(())
280 }
281
282 #[instrument(skip(self), name = "autoupdate_task")]
283 async fn autoupdate_task(&self) -> Result<(), Error> {
284 Ok(())
287 }
288
289 #[instrument(skip(self), name = "consensus_task")]
290 async fn consensus_task(&self) -> Result<(), Error> {
291 use consensus::consensus::{proc_consensus, proc_entries};
292
293 if let Err(e) = proc_entries(&self.fabric, &self.config, self).await {
295 warn!("proc_entries failed: {e}");
296 }
297
298 if let Err(e) = proc_consensus(&self.fabric) {
300 warn!("proc_consensus failed: {e}");
301 }
302
303 Ok(())
304 }
305
306 #[instrument(skip(self), name = "catchup_task")]
307 async fn catchup_task(&self) -> Result<(), Error> {
308 let temporal_height = match self.fabric.get_temporal_height() {
309 Ok(Some(h)) => h,
310 Ok(None) => 0,
311 Err(e) => return Err(e.into()),
312 };
313 let rooted_height = self.fabric.get_rooted_height()?.unwrap_or_default();
314 info!("Temporal: {} Rooted: {}", temporal_height, rooted_height);
315
316 let trainer_pks = self.fabric.trainers_for_height(temporal_height).unwrap_or_default();
317 let (peers_temporal, peers_rooted, peers_bft) = self.node_peers.get_heights(&trainer_pks).await?;
318
319 let behind_temporal = peers_temporal.saturating_sub(temporal_height);
320 let behind_rooted = peers_rooted.saturating_sub(rooted_height);
321 let behind_bft = peers_bft.saturating_sub(temporal_height);
322
323 if (temporal_height - rooted_height) > 1000 {
324 info!("Stopped syncing: getting {} consensuses starting {}", behind_rooted, rooted_height + 1);
327 let online_trainer_ips = self.node_peers.get_trainer_ips_above_rooted(peers_rooted, &trainer_pks).await?;
328 let heights: Vec<u32> = (rooted_height + 1..=peers_rooted).take(1000).collect();
329 let chunks: Vec<Vec<CatchupHeight>> = heights
330 .into_iter()
331 .map(|height| CatchupHeight { height, c: Some(true), e: None, a: None, hashes: None })
332 .collect::<Vec<_>>()
333 .chunks(200)
334 .map(|chunk| chunk.to_vec())
335 .collect();
336 self.fetch_chunks(chunks, online_trainer_ips).await?;
337 return Ok(());
338 }
339
340 if behind_bft > 0 {
341 info!("Behind BFT: Syncing {} entries", behind_bft);
342 let online_trainer_ips = self.node_peers.get_trainer_ips_above_rooted(peers_bft, &trainer_pks).await?;
343 let heights: Vec<u32> = (rooted_height + 1..=peers_bft).take(100).collect();
344 let chunks: Vec<Vec<CatchupHeight>> = heights
345 .into_iter()
346 .map(|height| CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: None })
347 .collect::<Vec<_>>()
348 .chunks(20)
349 .map(|chunk| chunk.to_vec())
350 .collect();
351 self.fetch_chunks(chunks, online_trainer_ips).await?;
352 return Ok(());
353 }
354
355 if behind_rooted > 0 {
356 info!("Behind rooted: Syncing {} entries", behind_rooted);
357 let online_trainer_ips = self.node_peers.get_trainer_ips_above_rooted(peers_rooted, &trainer_pks).await?;
358 let heights: Vec<u32> = (rooted_height + 1..=peers_rooted).take(1000).collect();
359 let chunks: Vec<Vec<CatchupHeight>> = heights
360 .into_iter()
361 .map(|height| {
362 let entries = self.fabric.entries_by_height(height as u64).unwrap_or_default();
363 let hashes = entries; CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: Some(hashes) }
365 })
366 .collect::<Vec<_>>()
367 .chunks(20)
368 .map(|chunk| chunk.to_vec())
369 .collect();
370 self.fetch_chunks(chunks, online_trainer_ips).await?;
371 return Ok(());
372 }
373
374 if behind_temporal > 0 {
375 info!("Behind temporal: Syncing {} entries", behind_temporal);
376 let online_trainer_ips =
377 self.node_peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
378 let heights: Vec<u32> = (temporal_height..=peers_temporal).take(1000).collect();
379 let chunks: Vec<Vec<CatchupHeight>> = heights
380 .into_iter()
381 .map(|height| {
382 let entries = self.fabric.entries_by_height(height as u64).unwrap_or_default();
383 let hashes = entries; CatchupHeight { height, c: None, e: Some(true), a: Some(true), hashes: Some(hashes) }
385 })
386 .collect::<Vec<_>>()
387 .chunks(10)
388 .map(|chunk| chunk.to_vec())
389 .collect();
390 self.fetch_chunks(chunks, online_trainer_ips).await?;
391 return Ok(());
392 }
393
394 if behind_temporal == 0 {
395 info!("In sync: Fetching attestations for last entry");
396 let online_trainer_ips =
397 self.node_peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
398 let entries = self.fabric.entries_by_height(temporal_height as u64).unwrap_or_default();
399 let hashes = entries;
400 let chunk = vec![CatchupHeight {
401 height: temporal_height,
402 c: None,
403 e: Some(true),
404 a: Some(true),
405 hashes: Some(hashes),
406 }];
407 self.fetch_chunks(vec![chunk], online_trainer_ips).await?;
408 }
409
410 Ok(())
411 }
412
413 async fn fetch_chunks(&self, chunks: Vec<Vec<CatchupHeight>>, peers: Vec<std::net::Ipv4Addr>) -> Result<(), Error> {
415 use rand::seq::SliceRandom;
416
417 let mut shuffled_peers = peers;
419 {
420 let mut rng = rand::rng();
421 shuffled_peers.shuffle(&mut rng);
422 }
423
424 for (chunk, peer_ip) in chunks.into_iter().zip(shuffled_peers.into_iter().cycle()) {
425 let catchup_msg = Catchup { heights: chunk };
426 if let Err(e) = catchup_msg.send_to_with_metrics(self, peer_ip).await {
427 warn!("Failed to send catchup to {}: {}", peer_ip, e);
428 }
429 }
430
431 Ok(())
432 }
433
434 pub fn get_prometheus_metrics(&self) -> String {
435 self.metrics.get_prometheus()
436 }
437
438 pub fn get_json_health(&self) -> Value {
439 serde_json::json!({
440 "status": "ok",
441 "version": env!("CARGO_PKG_VERSION"),
442 "uptime": self.metrics.get_uptime()
443 })
444 }
445
446 pub async fn send_message_to(&self, message: &impl Protocol, dst: Ipv4Addr) -> Result<(), Error> {
448 message.send_to_with_metrics(self, dst).await.map_err(Into::into)
449 }
450
451 pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
453 self.socket.recv_from_with_metrics(buf, &self.metrics).await
454 }
455
456 pub async fn is_peer_handshaked(&self, ip: Ipv4Addr) -> bool {
457 if let Some(peer) = self.node_peers.by_ip(ip).await {
458 if let Some(ref pk) = peer.pk {
459 if self.node_anrs.is_handshaked(pk).await {
460 return true;
461 }
462 }
463 }
464 false
465 }
466
467 pub async fn get_peers_summary(&self) -> Result<PeersSummary, Error> {
468 let my_ip = self.config.get_public_ipv4();
469 let temporal_height = self.get_temporal_height();
470 let trainer_pks = self.fabric.trainers_for_height(temporal_height as u32 + 1).unwrap_or_default();
471 self.node_peers.get_peers_summary(my_ip, &trainer_pks).await.map_err(Into::into)
472 }
473
474 pub fn get_softfork_status(&self) -> SoftforkStatus {
475 let temporal_height = self.get_temporal_height();
476 let rooted_height = self.get_rooted_height();
477 let gap = temporal_height.saturating_sub(rooted_height);
478
479 match gap {
480 0 | 1 => SoftforkStatus::Healthy,
481 2..=10 => SoftforkStatus::Minor,
482 _ => SoftforkStatus::Major,
483 }
484 }
485
486 pub fn get_config(&self) -> &config::Config {
487 &self.config
488 }
489
490 pub fn get_socket(&self) -> Arc<dyn UdpSocketExt> {
491 self.socket.clone()
492 }
493
494 pub fn get_metrics(&self) -> &metrics::Metrics {
495 &self.metrics
496 }
497
498 pub fn get_metrics_snapshot(&self) -> metrics::MetricsSnapshot {
499 self.metrics.get_snapshot()
500 }
501
502 pub fn get_system_stats(&self) -> SystemStats {
503 get_system_stats()
504 }
505
506 pub fn get_uptime(&self) -> String {
507 format_duration(self.metrics.get_uptime())
508 }
509
510 pub fn inc_tasks(&self) {
512 self.metrics.inc_tasks();
513 }
514
515 pub fn dec_tasks(&self) {
517 self.metrics.dec_tasks();
518 }
519
520 pub fn get_temporal_height(&self) -> u64 {
522 self.fabric.get_temporal_height().ok().flatten().unwrap_or_default() as u64
523 }
524
525 pub fn get_rooted_height(&self) -> u64 {
527 self.fabric.get_rooted_height().ok().flatten().unwrap_or_default() as u64
528 }
529
530 pub async fn get_entries(&self) -> Vec<(u64, u64, u64)> {
531 tokio::task::spawn_blocking(|| {
533 tokio::runtime::Handle::current().block_on(async {
534 consensus::doms::entry::get_archived_entries().await.unwrap_or_else(|_| {
535 vec![
537 (201, 20100123, 1024), (201, 20100456, 2048),
539 (202, 20200789, 1536),
540 (202, 20201012, 3072),
541 (203, 20300345, 2560),
542 ]
543 })
544 })
545 })
546 .await
547 .unwrap_or_else(|_| {
548 vec![
550 (201, 20100123, 1024),
551 (201, 20100456, 2048),
552 (202, 20200789, 1536),
553 (202, 20201012, 3072),
554 (203, 20300345, 2560),
555 ]
556 })
557 }
558
559 pub async fn set_peer_handshake_status(&self, ip: Ipv4Addr, status: HandshakeStatus) -> Result<(), peers::Error> {
561 self.node_peers.set_handshake_status(ip, status).await
562 }
563
564 pub async fn update_peer_from_anr(
566 &self,
567 ip: Ipv4Addr,
568 pk: &[u8; 48],
569 version: &Ver,
570 status: Option<HandshakeStatus>,
571 ) {
572 self.node_peers.update_peer_from_anr(ip, pk, version, status).await
573 }
574
575 pub async fn get_all_anrs(&self) -> Vec<anr::Anr> {
577 self.node_anrs.get_all().await
578 }
579
580 pub async fn get_anr_by_pk_b58(&self, pk_b58: &str) -> Option<anr::Anr> {
582 if let Ok(pk_bytes) = bs58::decode(pk_b58).into_vec() {
583 if pk_bytes.len() == 48 {
584 let mut pk_array = [0u8; 48];
585 pk_array.copy_from_slice(&pk_bytes);
586 return self.get_anr_by_pk(&pk_array).await;
587 }
588 }
589 None
590 }
591
592 pub async fn get_anr_by_pk(&self, pk: &[u8; 48]) -> Option<anr::Anr> {
594 let all_anrs = self.node_anrs.get_all().await;
595 all_anrs.into_iter().find(|anr| anr.pk == *pk)
596 }
597
598 pub async fn get_validator_anrs(&self) -> Vec<anr::Anr> {
600 let all_anrs = self.node_anrs.get_all().await;
601 all_anrs.into_iter().filter(|anr| anr.handshaked).collect()
602 }
603
604 pub async fn parse_udp(&self, buf: &[u8], src: Ipv4Addr) -> Option<Box<dyn Protocol>> {
607 self.metrics.add_incoming_udp_packet(buf.len());
608
609 match self.reassembler.add_shard(buf, &self.config.get_sk()).await {
611 Ok(Some((packet, pk))) => match parse_etf_bin(&packet) {
612 Ok(proto) => {
613 self.node_peers.update_peer_from_proto(src, proto.typename()).await;
614 if matches!(proto.typename(), NewPhoneWhoDis::TYPENAME | NewPhoneWhoDisReply::TYPENAME)
615 || self.node_anrs.handshaked_and_valid_ip4(&pk, &src).await
616 {
617 self.metrics.add_incoming_proto(proto.typename());
618 return Some(proto);
619 }
620 self.node_anrs.unset_handshaked(&pk).await;
621 self.metrics.add_error(&Error::String(format!("handshake needed {src}")));
622 }
623 Err(e) => self.metrics.add_error(&e),
624 },
625 Ok(None) => {} Err(e) => self.metrics.add_error(&e),
627 }
628
629 None
630 }
631
632 pub async fn handle(&self, message: Box<dyn Protocol>, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
633 self.metrics.add_incoming_proto(message.typename());
634 message.handle(self, src).await.map_err(|e| {
635 warn!("can't handle {}: {e}", message.typename());
636 self.metrics.add_error(&e);
637 e.into()
638 })
639 }
640
641 pub async fn execute(&self, instruction: Instruction) -> Result<(), Error> {
642 let name = instruction.typename();
643 self.execute_inner(instruction).await.inspect_err(|e| warn!("can't execute {name}: {e}"))
644 }
645
646 pub async fn execute_inner(&self, instruction: Instruction) -> Result<(), Error> {
648 match instruction {
649 Instruction::Noop { why } => {
650 debug!("noop: {why}");
651 }
652
653 Instruction::SendNewPhoneWhoDisReply { dst } => {
654 let anr = Anr::from_config(&self.config)?;
655 let reply = NewPhoneWhoDisReply::new(anr);
656 self.send_message_to(&reply, dst).await?;
657 }
658
659 Instruction::SendGetPeerAnrsReply { dst, anrs } => {
660 let peers_v2 = GetPeerAnrsReply { anrs };
661 self.send_message_to(&peers_v2, dst).await?;
662 }
663
664 Instruction::SendPingReply { ts_m, dst } => {
665 let seen_time_ms = get_unix_millis_now();
666 let pong = PingReply { ts_m: ts_m, seen_time: seen_time_ms };
667 self.send_message_to(&pong, dst).await?;
668 }
669
670 Instruction::ValidTxs { txs } => {
671 info!("received {} valid transactions", txs.len());
673 }
675
676 Instruction::ReceivedSol { sol: _ } => {
677 info!("received solution");
679 }
687
688 Instruction::ReceivedEntry { entry } => {
689 let seen_time = get_unix_millis_now();
692 match entry.pack() {
693 Ok(entry_bin) => {
694 if let Err(e) = self.fabric.insert_entry(
695 &entry.hash,
696 entry.header.height,
697 entry.header.slot,
698 &entry_bin,
699 seen_time,
700 ) {
701 warn!("Failed to insert entry at height {}: {}", entry.header.height, e);
702 } else {
703 debug!("Successfully inserted entry at height {}", entry.header.height);
704 }
705 }
706 Err(e) => warn!("Failed to pack entry for insertion: {}", e),
707 }
708 }
709
710 Instruction::ReceivedAttestation { attestation } => {
711 info!("received attestation for entry {:?}", &attestation.entry_hash[..8]);
713 debug!("Attestation handling not fully implemented yet");
718 }
719
720 Instruction::ReceivedConsensus { consensus } => {
721 debug!(
723 "received consensus for entry {}, mutations_hash {}, score {:?}",
724 bs58::encode(&consensus.entry_hash).into_string(),
725 bs58::encode(&consensus.mutations_hash).into_string(),
726 consensus.score
727 );
728 let mask = match consensus.mask.clone() {
731 Some(m) => m,
732 None => {
733 if let Some(entry) = self.fabric.get_entry_by_hash(&consensus.entry_hash) {
735 if let Some(trainers) = self.fabric.trainers_for_height(entry.header.height) {
736 vec![true; trainers.len()]
738 } else {
739 warn!("No trainers found for height {}, skipping consensus", entry.header.height);
740 return Ok(());
741 }
742 } else {
743 warn!("Entry not found for consensus, skipping");
744 return Ok(());
745 }
746 }
747 };
748 let score = consensus.score.unwrap_or(1.0);
749 if let Err(e) = self.fabric.insert_consensus(
750 consensus.entry_hash,
751 consensus.mutations_hash,
752 mask,
753 consensus.agg_sig,
754 score,
755 ) {
756 warn!("Failed to insert consensus: {}", e);
757 } else {
758 debug!("Successfully inserted consensus with score {}", score);
759 }
760 }
761
762 Instruction::ConsensusesPacked { packed: _ } => {
763 info!("received consensus bulk");
765 }
770
771 Instruction::CatchupEntryReq { heights } => {
772 info!("received catchup entry request for {} heights", heights.len());
774 if heights.len() > 100 {
775 warn!("catchup entry request too large: {} heights", heights.len());
776 }
777 }
782
783 Instruction::CatchupTriReq { heights } => {
784 info!("received catchup tri request for {} heights", heights.len());
786 if heights.len() > 30 {
787 warn!("catchup tri request too large: {} heights", heights.len());
788 }
789 }
794
795 Instruction::CatchupBiReq { heights } => {
796 info!("received catchup bi request for {} heights", heights.len());
798 if heights.len() > 30 {
799 warn!("catchup bi request too large: {} heights", heights.len());
800 }
801 }
806
807 Instruction::CatchupAttestationReq { hashes } => {
808 info!("received catchup attestation request for {} hashes", hashes.len());
810 if hashes.len() > 30 {
811 warn!("catchup attestation request too large: {} hashes", hashes.len());
812 }
813 }
818
819 Instruction::SpecialBusiness { business: _ } => {
820 info!("received special business");
822 }
828
829 Instruction::SpecialBusinessReply { business: _ } => {
830 info!("received special business reply");
832 }
838
839 Instruction::SolicitEntry { hash: _ } => {
840 info!("received solicit entry request");
842 }
848
849 Instruction::SolicitEntry2 => {
850 info!("received solicit entry2 request");
852 }
858 };
859
860 Ok(())
861 }
862}
863
864#[cfg(test)]
865mod tests {
866 use super::*;
867 use crate::socket::MockSocket;
868
869 #[tokio::test(flavor = "multi_thread")]
870 async fn tokio_rwlock_allows_concurrent_reads() {
871 let lock = tokio::sync::RwLock::new(0usize);
873
874 let r1 = lock.read().await;
876 assert_eq!(*r1, 0);
877 let r2 = lock.try_read();
879 assert!(r2.is_ok(), "try_read should succeed when another reader holds the lock");
880 drop(r2);
882 drop(r1);
883
884 let mut w = lock.write().await;
886 *w += 1;
887 assert_eq!(*w, 1);
888 }
889
890 #[tokio::test]
891 async fn test_anr_verification_request_creation() {
892 use crate::utils::bls12_381 as bls;
894 use std::net::Ipv4Addr;
895
896 let sk = bls::generate_sk();
898 let pk = bls::get_public_key(&sk).expect("pk");
899 let pop = bls::sign(&sk, &pk, consensus::DST_POP).expect("pop");
900
901 let config = config::Config {
902 work_folder: "/tmp/test".to_string(),
903 version: Ver::new(1, 2, 3),
904 offline: false,
905 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
906 http_port: 3000,
907 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
908 udp_port: 36969,
909 public_ipv4: Some("127.0.0.1".to_string()),
910 seed_ips: Vec::new(),
911 seed_anrs: Vec::new(),
912 other_nodes: Vec::new(),
913 trust_factor: 0.8,
914 max_peers: 100,
915 trainer_sk: sk,
916 trainer_pk: pk,
917 trainer_pk_b58: String::new(),
918 trainer_pop: pop.to_vec(),
919 archival_node: false,
920 autoupdate: false,
921 computor_type: None,
922 snapshot_height: 0,
923 anr: None,
924 anr_desc: None,
925 anr_name: None,
926 };
927
928 let target_ip = Ipv4Addr::new(127, 0, 0, 1);
929
930 let my_anr =
932 Anr::build(&config.trainer_sk, &config.trainer_pk, &config.trainer_pop, target_ip, Ver::new(1, 0, 0));
933 assert!(my_anr.is_ok());
934 }
935
936 #[tokio::test]
937 async fn test_get_random_unverified_anrs() {
938 let registry = NodeAnrs::new();
940 let result = registry.get_random_not_verified(3).await;
941
942 assert!(result.len() <= 3);
945 }
946
947 #[tokio::test]
948 async fn test_cleanup_stale_manual_trigger() {
949 use crate::utils::bls12_381 as bls;
951 use std::net::Ipv4Addr;
952
953 let sk = bls::generate_sk();
955 let pk = bls::get_public_key(&sk).expect("pk");
956 let pop = bls::sign(&sk, &pk, consensus::DST_POP).expect("pop");
957
958 let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
960 let config = config::Config {
961 work_folder: format!("/tmp/test_cleanup_{}", unique_id),
962 version: Ver::new(1, 2, 3),
963 offline: false,
964 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
965 http_port: 3000,
966 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
967 udp_port: 36969,
968 public_ipv4: Some("127.0.0.1".to_string()),
969 seed_ips: vec!["127.0.0.1".parse().unwrap()],
970 seed_anrs: Vec::new(),
971 other_nodes: Vec::new(),
972 trust_factor: 0.8,
973 max_peers: 100,
974 trainer_sk: sk,
975 trainer_pk: pk,
976 trainer_pk_b58: String::new(),
977 trainer_pop: pop.to_vec(),
978 archival_node: false,
979 autoupdate: false,
980 computor_type: None,
981 snapshot_height: 0,
982 anr: None,
983 anr_desc: None,
984 anr_name: None,
985 };
986
987 let socket = Arc::new(MockSocket::new());
989 match Context::with_config_and_socket(config, socket).await {
990 Ok(ctx) => {
991 ctx.cleanup_task(8).await;
993 }
994 Err(_) => {
995 }
998 }
999 }
1000
1001 #[tokio::test]
1002 async fn test_bootstrap_handshake_manual_trigger() {
1003 use crate::utils::bls12_381 as bls;
1005 use std::net::Ipv4Addr;
1006
1007 let sk = bls::generate_sk();
1009 let pk = bls::get_public_key(&sk).expect("pk");
1010 let pop = bls::sign(&sk, &pk, consensus::DST_POP).expect("pop");
1011
1012 let work_folder = format!("/tmp/test_bootstrap_{}", std::process::id());
1013 let config = config::Config {
1014 work_folder,
1015 version: Ver::new(1, 2, 3),
1016 offline: false,
1017 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1018 http_port: 3000,
1019 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1020 udp_port: 36969,
1021 public_ipv4: Some("127.0.0.1".to_string()),
1022 seed_ips: vec!["127.0.0.1".parse().unwrap()], seed_anrs: Vec::new(),
1024 other_nodes: Vec::new(),
1025 trust_factor: 0.8,
1026 max_peers: 100,
1027 trainer_sk: sk,
1028 trainer_pk: pk,
1029 trainer_pk_b58: String::new(),
1030 trainer_pop: pop.to_vec(),
1031 archival_node: false,
1032 autoupdate: false,
1033 computor_type: None,
1034 snapshot_height: 0,
1035 anr: None,
1036 anr_desc: None,
1037 anr_name: None,
1038 };
1039
1040 let socket = Arc::new(MockSocket::new());
1042 let ctx = Context::with_config_and_socket(config, socket).await.expect("context creation");
1043
1044 match ctx.bootstrap_task().await {
1046 Ok(()) => {
1047 }
1049 Err(_e) => {
1050 }
1053 }
1054 }
1055
1056 #[tokio::test]
1057 async fn test_context_task_tracking() {
1058 use crate::utils::bls12_381 as bls;
1060 use std::net::Ipv4Addr;
1061
1062 let sk = bls::generate_sk();
1063 let pk = bls::get_public_key(&sk).expect("pk");
1064 let pop = bls::sign(&sk, &pk, consensus::DST_POP).expect("pop");
1065
1066 let config = config::Config {
1067 work_folder: "/tmp/test_tasks".to_string(),
1068 version: Ver::new(1, 2, 3),
1069 offline: false,
1070 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1071 http_port: 3000,
1072 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1073 udp_port: 36969,
1074 public_ipv4: Some("127.0.0.1".to_string()),
1075 seed_ips: Vec::new(),
1076 seed_anrs: Vec::new(),
1077 other_nodes: Vec::new(),
1078 trust_factor: 0.8,
1079 max_peers: 100,
1080 trainer_sk: sk,
1081 trainer_pk: pk,
1082 trainer_pk_b58: String::new(),
1083 trainer_pop: pop.to_vec(),
1084 archival_node: false,
1085 autoupdate: false,
1086 computor_type: None,
1087 snapshot_height: 0,
1088 anr: None,
1089 anr_desc: None,
1090 anr_name: None,
1091 };
1092
1093 let socket = Arc::new(MockSocket::new());
1094 let metrics = metrics::Metrics::new();
1095 let node_peers = peers::NodePeers::default();
1096 let node_anrs = crate::node::anr::NodeAnrs::new();
1097 let reassembler = node::ReedSolomonReassembler::new();
1098
1099 let fabric = crate::consensus::fabric::Fabric::new(&config.get_root()).await.unwrap();
1100 let ctx = Context { config, metrics, reassembler, node_peers, node_anrs, fabric, socket };
1101
1102 let snapshot = ctx.get_metrics_snapshot();
1104 assert_eq!(snapshot.tasks, 0);
1105
1106 ctx.inc_tasks();
1107 ctx.inc_tasks();
1108 let snapshot = ctx.get_metrics_snapshot();
1109 assert_eq!(snapshot.tasks, 2);
1110
1111 ctx.dec_tasks();
1112 let snapshot = ctx.get_metrics_snapshot();
1113 assert_eq!(snapshot.tasks, 1);
1114 }
1115
1116 #[tokio::test]
1117 async fn test_context_convenience_socket_functions() {
1118 use std::sync::Arc;
1120
1121 use crate::utils::bls12_381 as bls;
1122 use std::net::Ipv4Addr;
1123
1124 let sk = bls::generate_sk();
1125 let pk = bls::get_public_key(&sk).expect("pk");
1126 let pop = bls::sign(&sk, &pk, consensus::DST_POP).expect("pop");
1127
1128 let config = config::Config {
1129 work_folder: "/tmp/test_convenience".to_string(),
1130 version: Ver::new(1, 2, 3),
1131 offline: false,
1132 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1133 http_port: 3000,
1134 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1135 udp_port: 36969,
1136 public_ipv4: Some("127.0.0.1".to_string()),
1137 seed_ips: Vec::new(),
1138 seed_anrs: Vec::new(),
1139 other_nodes: Vec::new(),
1140 trust_factor: 0.8,
1141 max_peers: 100,
1142 trainer_sk: sk,
1143 trainer_pk: pk,
1144 trainer_pk_b58: String::new(),
1145 trainer_pop: pop.to_vec(),
1146 archival_node: false,
1147 autoupdate: false,
1148 computor_type: None,
1149 snapshot_height: 0,
1150 anr: None,
1151 anr_desc: None,
1152 anr_name: None,
1153 };
1154 let socket = Arc::new(MockSocket::new());
1155
1156 match Context::with_config_and_socket(config, socket).await {
1157 Ok(context) => {
1158 let mut buf = [0u8; 1024];
1159 let target: Ipv4Addr = "127.0.0.1".parse().unwrap();
1160
1161 let pong = PingReply { ts_m: 1234567890, seen_time: 1234567890123 };
1162 match context.send_message_to(&pong, target).await {
1164 Ok(_) => {
1165 }
1167 Err(_) => {
1168 }
1170 }
1171
1172 match context.recv_from(&mut buf).await {
1174 Ok(_) => {
1175 }
1177 Err(_) => {
1178 }
1180 }
1181 }
1182 Err(_) => {
1183 }
1185 }
1186 }
1187}