1use crate::node::anr::{Anr, NodeAnrs};
2use crate::node::peers::{HandshakeStatus, PeersSummary};
3use crate::node::protocol::*;
4use crate::node::protocol::{Catchup, CatchupHeight, Instruction, NewPhoneWhoDis, NewPhoneWhoDisReply, Typename};
5use crate::node::{anr, peers};
6use crate::socket::UdpSocketExt;
7use crate::utils::misc::format_duration;
8use crate::utils::{Hash, PublicKey};
9use crate::{SystemStats, Ver, config, consensus, get_system_stats, metrics, node, utils};
10use amadeus_utils::vecpak;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use std::net::{Ipv4Addr, SocketAddr};
14use std::sync::Arc;
15use tracing::{debug, info, instrument, warn};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "lowercase")]
20pub enum SoftforkStatus {
21 #[serde(rename = "")]
23 Healthy,
24 Minor,
26 Major,
28}
29
30#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
31pub enum Error {
32 #[error(transparent)]
33 Io(#[from] std::io::Error),
34 #[error(transparent)]
35 Fabric(#[from] consensus::fabric::Error),
36 #[error(transparent)]
37 Archiver(#[from] utils::archiver::Error),
38 #[error(transparent)]
39 Protocol(#[from] node::protocol::Error),
40 #[error(transparent)]
41 Config(#[from] config::Error),
42 #[error(transparent)]
43 Anr(#[from] anr::Error),
44 #[error(transparent)]
45 Peers(#[from] peers::Error),
46 #[error("other error: {0}")]
47 Other(String),
48}
49
50impl Typename for Error {
51 fn typename(&self) -> &'static str {
52 self.into()
53 }
54}
55
56pub struct Context {
57 pub(crate) config: config::Config,
58 pub(crate) metrics: metrics::Metrics,
59 pub(crate) reassembler: node::ReedSolomonReassembler,
60 pub(crate) peers: peers::NodePeers,
61 pub(crate) anrs: NodeAnrs,
62 pub(crate) fabric: crate::consensus::fabric::Fabric,
63 pub(crate) socket: Arc<dyn UdpSocketExt>,
64}
65
66impl Context {
67 pub async fn with_config_and_socket(
68 config: config::Config,
69 socket: Arc<dyn UdpSocketExt>,
70 ) -> Result<Arc<Self>, Error> {
71 use crate::config::{
72 ANR_PERIOD_MILLIS, BROADCAST_PERIOD_MILLIS, CATCHUP_PERIOD_MILLIS, CLEANUP_PERIOD_MILLIS,
73 CONSENSUS_PERIOD_MILLIS,
74 };
75 use crate::consensus::fabric::Fabric;
76 use crate::utils::archiver::init_storage;
77 use metrics::Metrics;
78 use node::ReedSolomonReassembler;
79 use tokio::time::{Duration, interval};
80
81 assert_ne!(config.get_root(), "");
82 init_storage(&config.get_root()).await?;
83
84 let fabric = Fabric::new(&config.get_root()).await?;
85 let metrics = Metrics::new();
86 let node_peers = peers::NodePeers::default();
87 let node_anrs = NodeAnrs::new();
88 let reassembler = ReedSolomonReassembler::new();
89
90 node_anrs.seed(&config).await; node_peers.seed(&fabric, &config, &node_anrs).await?;
92
93 let ctx = Arc::new(Self { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket });
94
95 tokio::spawn({
96 let ctx = ctx.clone();
97 async move {
98 if let Err(e) = ctx.bootstrap_task().await {
99 warn!("bootstrap task error: {e}");
100 }
102 }
103 });
104
105 tokio::spawn({
106 let ctx = ctx.clone();
107 async move {
108 let mut ticker = interval(Duration::from_millis(CLEANUP_PERIOD_MILLIS));
109 loop {
110 ticker.tick().await;
111 ctx.cleanup_task().await;
112 }
113 }
114 });
115
116 tokio::spawn({
117 let ctx = ctx.clone();
118 async move {
119 let mut ticker = interval(Duration::from_millis(ANR_PERIOD_MILLIS));
120 ticker.tick().await;
121 loop {
122 ticker.tick().await;
123 if let Err(e) = ctx.anr_task().await {
124 warn!("anr task error: {e}");
125 }
127 }
128 }
129 });
130
131 tokio::spawn({
132 let ctx = ctx.clone();
133 async move {
134 let mut ticker = interval(Duration::from_millis(BROADCAST_PERIOD_MILLIS));
135 loop {
136 ticker.tick().await;
137 if let Err(e) = ctx.broadcast_task().await {
138 warn!("broadcast task error: {e}");
140 }
142 }
143 }
144 });
145
146 tokio::spawn({
147 let ctx = ctx.clone();
148 async move {
149 let mut ticker = interval(Duration::from_millis(CONSENSUS_PERIOD_MILLIS));
150 loop {
151 ticker.tick().await;
152 if let Err(e) = ctx.consensus_task().await {
153 warn!("consensus task error: {e}");
154 }
155 }
156 }
157 });
158
159 tokio::spawn({
160 let ctx = ctx.clone();
161 async move {
162 let mut is_syncing = false;
163 loop {
164 let tick_ms = if is_syncing { 30 } else { CATCHUP_PERIOD_MILLIS };
165 tokio::time::sleep(Duration::from_millis(tick_ms)).await;
166
167 match ctx.catchup_task().await {
168 Ok(syncing) => is_syncing = syncing,
169 Err(e) => warn!("catchup task error: {e}"),
170 }
171 }
172 }
173 });
174
175 tokio::spawn({
176 let ctx = ctx.clone();
177 async move {
178 let mut ticker = interval(Duration::from_millis(600_000));
179 loop {
180 ticker.tick().await;
181 if let Err(e) = ctx.autoupdate_task().await {
182 warn!("autoupdate task error: {e}");
183 }
185 }
186 }
187 });
188
189 Ok(ctx)
190 }
191
192 #[instrument(skip(self), name = "bootstrap_task")]
193 async fn bootstrap_task(&self) -> Result<(), Error> {
194 let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
195
196 for ip in &self.config.seed_ips {
197 match new_phone_who_dis.send_to_with_metrics(self, *ip).await {
199 Ok(_) => {
200 debug!("sent encrypted new_phone_who_dis to seed {ip}");
201 }
202 Err(e) => {
203 warn!("failed to send encrypted new_phone_who_dis to seed {ip}: {e}");
205 }
206 }
207 self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
209 }
210
211 info!("sent new_phone_who_dis to {} seed nodes", self.config.seed_ips.len());
212 Ok(())
213 }
214
215 #[instrument(skip(self), name = "cleanup_task")]
216 async fn cleanup_task(&self) {
217 self.anrs.update_rate_limiting_counters().await;
218 let cleared_shards = self.reassembler.clear_stale().await;
219 let cleared_peers = self.peers.clear_stale(&self.fabric, &self.anrs).await;
220 if cleared_shards > 0 || cleared_peers > 0 {
221 debug!("cleared {} stale shards, {} stale peers", cleared_shards, cleared_peers);
222 }
223 }
224
225 #[instrument(skip(self), name = "anr_task")]
226 async fn anr_task(&self) -> Result<(), Error> {
227 let unverified_ips = self.anrs.get_random_not_verified(3).await;
228 if !unverified_ips.is_empty() {
229 let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
230 for ip in &unverified_ips {
231 new_phone_who_dis.send_to_with_metrics(self, *ip).await?;
232 self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
233 }
234 }
235
236 let verified_ips = self.anrs.get_random_verified(3).await;
237 if !verified_ips.is_empty() {
238 let get_peer_anrs = Protocol::GetPeerAnrs(GetPeerAnrs::new(self.anrs.get_all_b3f4().await));
239 for ip in &verified_ips {
240 self.send_message_to(&get_peer_anrs, *ip).await?;
241 }
242 }
243
244 info!("sent new_phone_who_dis to {} and get_peer_anrs to {} nodes", unverified_ips.len(), verified_ips.len());
245
246 Ok(())
247 }
248
249 #[instrument(skip(self), name = "broadcast_task")]
250 async fn broadcast_task(&self) -> Result<(), Error> {
251 let ping = Protocol::Ping(Ping::new());
252 let tip = Protocol::EventTip(EventTip::from_current_tips_db(&self.fabric)?);
253
254 let my_ip = self.config.get_public_ipv4();
255 let peers = self.peers.get_all().await?;
256 if !peers.is_empty() {
257 let mut sent_count = 0;
258 for peer in peers {
259 if peer.ip != my_ip {
260 self.send_message_to(&ping, peer.ip).await?;
261 self.send_message_to(&tip, peer.ip).await?;
262 sent_count += 1;
263 }
264 }
265
266 debug!("sent {sent_count} ping and tip messages");
267 }
268
269 Ok(())
270 }
271
272 #[instrument(skip(self), name = "autoupdate_task")]
273 async fn autoupdate_task(&self) -> Result<(), Error> {
274 Ok(())
277 }
278
279 #[instrument(skip(self), name = "consensus_task")]
280 async fn consensus_task(&self) -> Result<(), Error> {
281 use consensus::consensus::{proc_consensus, proc_entries};
282 self.fabric.start_proc_consensus();
283 if let Err(e) = proc_entries(&self.fabric, &self.config, self).await {
284 warn!("proc_entries failed: {e}");
285 }
286
287 if let Err(e) = proc_consensus(&self.fabric) {
288 warn!("proc_consensus failed: {e}");
289 }
290 self.fabric.stop_proc_consensus();
291 Ok(())
292 }
293
294 #[instrument(skip(self), name = "catchup_task")]
295 async fn catchup_task(&self) -> Result<bool, Error> {
296 if self.fabric.is_proc_consensus() {
297 return Ok(true);
298 }
299
300 let temporal_height = match self.fabric.get_temporal_height() {
301 Ok(Some(h)) => h,
302 Ok(None) => 0,
303 Err(e) => return Err(e.into()),
304 };
305 let rooted_height = self.fabric.get_rooted_height()?.unwrap_or_default();
306 info!("Temporal: {} Rooted: {}", temporal_height, rooted_height);
307
308 let trainer_pks = self.fabric.trainers_for_height(temporal_height).unwrap_or_default();
309 let (peers_temporal, peers_rooted, peers_bft) = self.peers.get_heights(&trainer_pks).await?;
310
311 let behind_temporal = peers_temporal.saturating_sub(temporal_height);
312 let behind_rooted = peers_rooted.saturating_sub(rooted_height);
313 let behind_bft = peers_bft.saturating_sub(temporal_height);
314 let rooting_stuck = temporal_height.saturating_sub(rooted_height) > 1000;
315
316 if rooting_stuck {
317 warn!(
318 "Stopped syncing: getting {} consensuses starting {}",
319 temporal_height - rooted_height,
320 rooted_height + 1
321 );
322 let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(temporal_height, &trainer_pks).await?;
323 let heights: Vec<u64> = (rooted_height + 1..=temporal_height).take(200).collect();
324 let chunks: Vec<Vec<CatchupHeight>> = heights
325 .into_iter()
326 .map(|height| CatchupHeight { height, c: Some(true), e: None, a: None, hashes: None })
327 .collect::<Vec<_>>()
328 .chunks(20)
329 .map(|chunk| chunk.to_vec())
330 .collect();
331 self.fetch_heights(chunks, online_trainer_ips).await?;
332 return Ok(false);
333 }
334
335 if behind_bft > 0 {
336 info!("Behind BFT: Syncing {} entries", behind_bft);
337 let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_bft, &trainer_pks).await?;
338 let heights: Vec<u64> = (rooted_height + 1..=peers_bft).take(200).collect();
339 let chunks: Vec<Vec<CatchupHeight>> = heights
340 .into_iter()
341 .map(|height| CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: None })
342 .collect::<Vec<_>>()
343 .chunks(20)
344 .map(|chunk| chunk.to_vec())
345 .collect();
346 self.fetch_heights(chunks, online_trainer_ips).await?;
347 return Ok(false);
348 }
349
350 if behind_rooted > 0 {
351 info!("Behind rooted: Syncing {} entries", behind_rooted);
352 let online_trainer_ips = self.peers.get_trainer_ips_above_rooted(peers_rooted, &trainer_pks).await?;
353 let heights: Vec<u64> = (rooted_height + 1..=peers_rooted).take(200).collect();
354 let chunks: Vec<Vec<CatchupHeight>> = heights
355 .into_iter()
356 .map(|height| {
357 let entries = self.fabric.entries_by_height(height).unwrap_or_default();
358 let hashes = entries;
359 CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: Some(hashes) }
360 })
361 .collect::<Vec<_>>()
362 .chunks(20)
363 .map(|chunk| chunk.to_vec())
364 .collect();
365 self.fetch_heights(chunks, online_trainer_ips).await?;
366 return Ok(false);
367 }
368
369 if behind_temporal > 0 {
370 info!("Behind temporal: Syncing {} entries", behind_temporal);
371 let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
372 let heights: Vec<u64> = (temporal_height..=peers_temporal).take(200).collect();
373 let chunks: Vec<Vec<CatchupHeight>> = heights
374 .into_iter()
375 .map(|height| {
376 let entries = self.fabric.entries_by_height(height).unwrap_or_default();
377 let hashes = entries;
378 CatchupHeight { height, c: None, e: Some(true), a: Some(true), hashes: Some(hashes) }
379 })
380 .collect::<Vec<_>>()
381 .chunks(20)
382 .map(|chunk| chunk.to_vec())
383 .collect();
384 self.fetch_heights(chunks, online_trainer_ips).await?;
385 return Ok(false);
386 }
387
388 if behind_temporal == 0 {
389 info!("In sync: Fetching attestations for last entry");
390 let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
391 let entries = self.fabric.entries_by_height(temporal_height).unwrap_or_default();
392 let hashes = entries;
393 let chunk = vec![CatchupHeight {
394 height: temporal_height,
395 c: None,
396 e: Some(true),
397 a: Some(true),
398 hashes: Some(hashes),
399 }];
400 self.fetch_heights(vec![chunk], online_trainer_ips).await?;
401 }
402
403 Ok(behind_temporal > 0 || behind_rooted > 0 || behind_bft > 0 || rooting_stuck)
404 }
405
406 async fn fetch_heights(
407 &self,
408 chunks: Vec<Vec<CatchupHeight>>,
409 peers: Vec<std::net::Ipv4Addr>,
410 ) -> Result<(), Error> {
411 use rand::seq::SliceRandom;
412 let mut shuffled_peers = peers;
413 {
414 let mut rng = rand::rng();
415 shuffled_peers.shuffle(&mut rng);
416 }
417
418 for (chunk, peer_ip) in chunks.into_iter().zip(shuffled_peers.into_iter().cycle()) {
419 Protocol::Catchup(Catchup { height_flags: chunk }).send_to_with_metrics(self, peer_ip).await?;
420 }
421 Ok(())
422 }
423
424 pub fn get_prometheus_metrics(&self) -> String {
425 self.metrics.get_prometheus()
426 }
427
428 pub fn get_json_health(&self) -> Value {
429 serde_json::json!({
430 "status": "ok",
431 "version": env!("CARGO_PKG_VERSION"),
432 "uptime": self.metrics.get_uptime()
433 })
434 }
435
436 pub async fn send_message_to(&self, message: &Protocol, dst: Ipv4Addr) -> Result<(), Error> {
438 message.send_to_with_metrics(self, dst).await.map_err(Into::into)
439 }
440
441 pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
443 self.socket.recv_from_with_metrics(buf, &self.metrics).await
444 }
445
446 pub async fn is_peer_handshaked(&self, ip: Ipv4Addr) -> bool {
447 if let Some(peer) = self.peers.by_ip(ip).await {
448 if let Some(ref pk) = peer.pk {
449 if self.anrs.is_handshaked(pk.as_ref()).await {
450 return true;
451 }
452 }
453 }
454 false
455 }
456
457 pub async fn get_peers_summary(&self) -> Result<PeersSummary, Error> {
458 let my_ip = self.config.get_public_ipv4();
459 let temporal_height = self.get_temporal_height();
460 let trainer_pks = self.fabric.trainers_for_height(temporal_height + 1).unwrap_or_default();
461 self.peers.get_peers_summary(my_ip, &trainer_pks).await.map_err(Into::into)
462 }
463
464 pub fn get_softfork_status(&self) -> SoftforkStatus {
465 let temporal_height = self.get_temporal_height();
466 let rooted_height = self.get_rooted_height();
467 let gap = temporal_height.saturating_sub(rooted_height);
468
469 match gap {
470 0 | 1 => SoftforkStatus::Healthy,
471 2..=10 => SoftforkStatus::Minor,
472 _ => SoftforkStatus::Major,
473 }
474 }
475
476 pub fn get_config(&self) -> &config::Config {
477 &self.config
478 }
479
480 pub fn get_socket(&self) -> Arc<dyn UdpSocketExt> {
481 self.socket.clone()
482 }
483
484 pub fn get_metrics(&self) -> &metrics::Metrics {
485 &self.metrics
486 }
487
488 pub fn get_metrics_snapshot(&self) -> metrics::MetricsSnapshot {
489 self.metrics.get_snapshot()
490 }
491
492 pub fn get_system_stats(&self) -> SystemStats {
493 get_system_stats()
494 }
495
496 pub fn get_uptime(&self) -> String {
497 format_duration(self.metrics.get_uptime())
498 }
499
500 pub fn inc_tasks(&self) {
502 self.metrics.inc_tasks();
503 }
504
505 pub fn dec_tasks(&self) {
507 self.metrics.dec_tasks();
508 }
509
510 pub fn get_temporal_height(&self) -> u64 {
512 self.fabric.get_temporal_height().ok().flatten().unwrap_or_default() as u64
513 }
514
515 pub fn get_rooted_height(&self) -> u64 {
517 self.fabric.get_rooted_height().ok().flatten().unwrap_or_default() as u64
518 }
519
520 pub async fn get_entries(&self) -> Vec<(u64, u64, u64)> {
521 tokio::task::spawn_blocking(|| {
523 tokio::runtime::Handle::current().block_on(async {
524 consensus::doms::entry::get_archived_entries().await.unwrap_or_else(|_| {
525 vec![
527 (201, 20100123, 1024), (201, 20100456, 2048),
529 (202, 20200789, 1536),
530 (202, 20201012, 3072),
531 (203, 20300345, 2560),
532 ]
533 })
534 })
535 })
536 .await
537 .unwrap_or_else(|_| {
538 vec![
540 (201, 20100123, 1024),
541 (201, 20100456, 2048),
542 (202, 20200789, 1536),
543 (202, 20201012, 3072),
544 (203, 20300345, 2560),
545 ]
546 })
547 }
548
549 pub async fn set_peer_handshake_status(&self, ip: Ipv4Addr, status: HandshakeStatus) -> Result<(), peers::Error> {
551 self.peers.set_handshake_status(ip, status).await
552 }
553
554 pub async fn update_peer_from_anr(
556 &self,
557 ip: Ipv4Addr,
558 pk: &PublicKey,
559 version: &Ver,
560 status: Option<HandshakeStatus>,
561 ) {
562 self.peers.update_peer_from_anr(ip, pk, version, status).await
563 }
564
565 pub async fn get_all_anrs(&self) -> Vec<anr::Anr> {
567 self.anrs.get_all().await
568 }
569
570 pub async fn get_anr_by_pk_b58(&self, pk_b58: &str) -> Option<anr::Anr> {
572 if let Ok(pk_bytes) = bs58::decode(pk_b58).into_vec() {
573 let pk_array: Result<[u8; 48], _> = pk_bytes.try_into();
574 if let Ok(pk_array) = pk_array {
575 let pk = PublicKey::from(pk_array);
576 return self.get_anr_by_pk(&pk).await;
577 }
578 }
579 None
580 }
581
582 pub async fn get_anr_by_pk(&self, pk: &PublicKey) -> Option<anr::Anr> {
584 let all_anrs = self.anrs.get_all().await;
585 all_anrs.into_iter().find(|anr| &anr.pk == pk)
586 }
587
588 pub async fn get_validator_anrs(&self) -> Vec<anr::Anr> {
590 let all_anrs = self.anrs.get_all().await;
591 all_anrs.into_iter().filter(|anr| anr.handshaked).collect()
592 }
593
594 pub fn get_entries_by_height(&self, height: u64) -> Result<Vec<consensus::doms::entry::Entry>, Error> {
596 let entries_raw = self.fabric.entries_by_height(height)?;
597 entries_raw
598 .into_iter()
599 .map(|raw| {
600 consensus::doms::entry::Entry::from_vecpak_bin(&raw)
601 .map_err(|_| Error::Other("entry not vecpak in fabric".into()))
602 })
603 .collect()
604 }
605
606 pub fn get_temporal_entry(&self) -> Result<Option<consensus::doms::entry::Entry>, Error> {
608 self.fabric.get_temporal_entry().map_err(Into::into)
609 }
610
611 pub fn get_trainers_for_height(&self, height: u64) -> Option<Vec<PublicKey>> {
613 self.fabric.trainers_for_height(height)
614 }
615
616 pub fn get_wallet_balance(&self, public_key: &PublicKey, symbol: &[u8]) -> i128 {
618 self.fabric.chain_balance_symbol(public_key.as_ref(), symbol)
619 }
620
621 pub fn get_contract_state(&self, contract: &PublicKey, key: &[u8]) -> Option<Vec<u8>> {
623 use amadeus_utils::constants::CF_CONTRACTSTATE;
624 let full_key = [b"bic:contract:" as &[u8], contract.as_ref(), b":" as &[u8], key].concat();
625 self.fabric.db().get(CF_CONTRACTSTATE, &full_key).ok().flatten()
626 }
627
628 pub fn get_chain_diff_bits(&self) -> u32 {
630 self.fabric.chain_diff_bits() as u32
631 }
632
633 pub fn get_chain_total_sols(&self) -> i128 {
635 self.fabric.chain_total_sols() as i128
636 }
637
638 pub fn get_all_wallet_balances(&self, public_key: &PublicKey) -> Vec<(Vec<u8>, i128)> {
640 use amadeus_utils::constants::CF_CONTRACTSTATE;
641 let prefix = [b"account:" as &[u8], public_key.as_ref() as &[u8], b":balance:" as &[u8]].concat();
642 self.fabric
643 .db()
644 .iter_prefix(CF_CONTRACTSTATE, &prefix)
645 .unwrap_or_default()
646 .into_iter()
647 .filter_map(|(key, value)| {
648 if key.len() <= prefix.len() {
649 return None;
650 }
651 let symbol = key[prefix.len()..].to_vec();
652 let amount = std::str::from_utf8(&value).ok()?.parse::<i128>().ok()?;
653 if amount > 0 { Some((symbol, amount)) } else { None }
654 })
655 .collect()
656 }
657
658 pub fn get_entry_by_hash(&self, hash: &Hash) -> Option<consensus::doms::entry::Entry> {
660 self.fabric.get_entry_by_hash(hash)
661 }
662
663 pub fn db_get(&self, cf: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
665 self.fabric.db().get(cf, key).map_err(|e| Error::Other(e.to_string()))
666 }
667
668 pub fn db_iter_prefix(&self, cf: &str, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Error> {
670 self.fabric.db().iter_prefix(cf, prefix).map_err(|e| Error::Other(e.to_string()))
671 }
672
673 pub async fn parse_udp(&self, buf: &[u8], src: Ipv4Addr) -> Option<Protocol> {
676 self.metrics.add_incoming_udp_packet(buf.len());
677
678 if !self.anrs.is_within_udp_limit(src).await? {
679 return None; }
681
682 match self.reassembler.add_shard(buf, &self.config.get_sk()).await {
684 Ok(Some((packet, pk))) => {
685 match vecpak::from_slice::<Protocol>(&packet) {
686 Ok(proto) => {
687 self.peers.update_peer_from_proto(src, proto.typename()).await;
688 let is_allowed =
689 matches!(proto, Protocol::NewPhoneWhoDis(_) | Protocol::NewPhoneWhoDisReply(_))
690 || self.anrs.handshaked_and_valid_ip4(pk.as_ref(), &src).await;
691
692 if !is_allowed {
693 self.anrs.unset_handshaked(pk.as_ref()).await;
694 warn!("handshake needed {src}");
695 return None; }
697
698 if !self.anrs.is_within_proto_limit(pk.as_ref(), proto.typename()).await? {
699 return None; }
701
702 self.metrics.add_incoming_proto(proto.typename());
703 return Some(proto);
704 }
705 Err(e) => warn!("parse error: {e} {}", hex::encode(packet)),
706 }
707 }
708 Ok(None) => {} Err(e) => warn!("bad udp frame from {src} - {e}"),
710 }
711
712 None
713 }
714
715 pub async fn handle(&self, message: Protocol, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
716 self.metrics.add_incoming_proto(message.typename());
717 message.handle(self, src).await.map_err(|e| {
718 warn!("can't handle {}: {e}", message.typename());
719 e.into()
720 })
721 }
722
723 pub async fn execute(&self, instruction: Instruction) -> Result<(), Error> {
724 let name = instruction.typename();
725 self.execute_inner(instruction).await.inspect_err(|e| warn!("can't execute {name}: {e}"))
726 }
727
728 pub async fn execute_inner(&self, instruction: Instruction) -> Result<(), Error> {
730 match instruction {
731 Instruction::Noop { why } => {
732 debug!("noop: {why}");
733 }
734
735 Instruction::SendNewPhoneWhoDisReply { dst } => {
736 let anr = Anr::from_config(&self.config)?;
737 let reply = Protocol::NewPhoneWhoDisReply(NewPhoneWhoDisReply::new(anr));
738 self.send_message_to(&reply, dst).await?;
739 }
740
741 Instruction::SendGetPeerAnrsReply { dst, anrs } => {
742 let peers_v2 = Protocol::GetPeerAnrsReply(GetPeerAnrsReply::new(anrs));
743 self.send_message_to(&peers_v2, dst).await?;
744 }
745
746 Instruction::SendPingReply { ts_m, dst } => {
747 let pong = Protocol::PingReply(PingReply::new(ts_m));
748 self.send_message_to(&pong, dst).await?;
749 }
750
751 Instruction::ValidTxs { txs } => {
752 info!("received {} valid transactions", txs.len());
754 }
756
757 Instruction::ReceivedEntry { entry: _ } => {
758 }
760
761 Instruction::ReceivedAttestation { attestation } => {
762 info!("received attestation for entry {:?}", &attestation.entry_hash[..8]);
764 debug!("Attestation handling not fully implemented yet");
769 }
770
771 Instruction::ReceivedConsensus { consensus } => {
772 let _ = self.fabric.insert_consensus(&consensus);
773 }
774
775 Instruction::SpecialBusiness { business: _ } => {
776 info!("received special business");
778 }
784
785 Instruction::SpecialBusinessReply { business: _ } => {
786 info!("received special business reply");
788 }
794
795 Instruction::SolicitEntry { hash: _ } => {
796 info!("received solicit entry request");
798 }
804
805 Instruction::SolicitEntry2 => {
806 info!("received solicit entry2 request");
808 }
814 };
815
816 Ok(())
817 }
818}
819
820#[cfg(test)]
821mod tests {
822 use super::*;
823 use crate::socket::MockSocket;
824
825 #[tokio::test(flavor = "multi_thread")]
826 async fn tokio_rwlock_allows_concurrent_reads() {
827 let lock = tokio::sync::RwLock::new(0usize);
829
830 let r1 = lock.read().await;
832 assert_eq!(*r1, 0);
833 let r2 = lock.try_read();
835 assert!(r2.is_ok(), "try_read should succeed when another reader holds the lock");
836 drop(r2);
838 drop(r1);
839
840 let mut w = lock.write().await;
842 *w += 1;
843 assert_eq!(*w, 1);
844 }
845
846 #[tokio::test]
847 async fn test_anr_verification_request_creation() {
848 use crate::utils::bls12_381 as bls;
850 use std::net::Ipv4Addr;
851
852 let sk = bls::generate_sk();
854 let pk = bls::get_public_key(&sk).expect("pk");
855 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
856
857 let config = config::Config {
858 work_folder: "/tmp/test".to_string(),
859 version: Ver::new(1, 2, 3),
860 offline: false,
861 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
862 http_port: 3000,
863 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
864 udp_port: 36969,
865 public_ipv4: Some("127.0.0.1".to_string()),
866 seed_ips: Vec::new(),
867 seed_anrs: Vec::new(),
868 other_nodes: Vec::new(),
869 trust_factor: 0.8,
870 max_peers: 100,
871 trainer_sk: sk,
872 trainer_pk: pk,
873 trainer_pk_b58: String::new(),
874 trainer_pop: pop.to_vec(),
875 archival_node: false,
876 autoupdate: false,
877 computor_type: None,
878 snapshot_height: 0,
879 anr: None,
880 anr_desc: None,
881 anr_name: None,
882 };
883
884 let target_ip = Ipv4Addr::new(127, 0, 0, 1);
885
886 let my_anr =
888 Anr::build(&config.trainer_sk, &config.trainer_pk, &config.trainer_pop, target_ip, Ver::new(1, 0, 0));
889 assert!(my_anr.is_ok());
890 }
891
892 #[tokio::test]
893 async fn test_get_random_unverified_anrs() {
894 let registry = NodeAnrs::new();
896 let result = registry.get_random_not_verified(3).await;
897
898 assert!(result.len() <= 3);
901 }
902
903 #[tokio::test]
904 async fn test_cleanup_stale_manual_trigger() {
905 use crate::utils::bls12_381 as bls;
907 use std::net::Ipv4Addr;
908
909 let sk = bls::generate_sk();
911 let pk = bls::get_public_key(&sk).expect("pk");
912 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
913
914 let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
916 let config = config::Config {
917 work_folder: format!("/tmp/test_cleanup_{}", unique_id),
918 version: Ver::new(1, 2, 3),
919 offline: false,
920 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
921 http_port: 3000,
922 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
923 udp_port: 36969,
924 public_ipv4: Some("127.0.0.1".to_string()),
925 seed_ips: vec!["127.0.0.1".parse().unwrap()],
926 seed_anrs: Vec::new(),
927 other_nodes: Vec::new(),
928 trust_factor: 0.8,
929 max_peers: 100,
930 trainer_sk: sk,
931 trainer_pk: pk,
932 trainer_pk_b58: String::new(),
933 trainer_pop: pop.to_vec(),
934 archival_node: false,
935 autoupdate: false,
936 computor_type: None,
937 snapshot_height: 0,
938 anr: None,
939 anr_desc: None,
940 anr_name: None,
941 };
942
943 let socket = Arc::new(MockSocket::new());
945 match Context::with_config_and_socket(config, socket).await {
946 Ok(ctx) => {
947 ctx.cleanup_task().await;
949 }
950 Err(_) => {
951 }
954 }
955 }
956
957 #[tokio::test]
958 async fn test_bootstrap_handshake_manual_trigger() {
959 use crate::utils::bls12_381 as bls;
961 use std::net::Ipv4Addr;
962
963 let sk = bls::generate_sk();
965 let pk = bls::get_public_key(&sk).expect("pk");
966 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
967
968 let work_folder = format!("/tmp/test_bootstrap_{}", std::process::id());
969 let config = config::Config {
970 work_folder,
971 version: Ver::new(1, 2, 3),
972 offline: false,
973 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
974 http_port: 3000,
975 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
976 udp_port: 36969,
977 public_ipv4: Some("127.0.0.1".to_string()),
978 seed_ips: vec!["127.0.0.1".parse().unwrap()], seed_anrs: Vec::new(),
980 other_nodes: Vec::new(),
981 trust_factor: 0.8,
982 max_peers: 100,
983 trainer_sk: sk,
984 trainer_pk: pk,
985 trainer_pk_b58: String::new(),
986 trainer_pop: pop.to_vec(),
987 archival_node: false,
988 autoupdate: false,
989 computor_type: None,
990 snapshot_height: 0,
991 anr: None,
992 anr_desc: None,
993 anr_name: None,
994 };
995
996 let socket = Arc::new(MockSocket::new());
998 let ctx = Context::with_config_and_socket(config, socket).await.expect("context creation");
999
1000 match ctx.bootstrap_task().await {
1002 Ok(()) => {
1003 }
1005 Err(_e) => {
1006 }
1009 }
1010 }
1011
1012 #[tokio::test]
1013 async fn test_context_task_tracking() {
1014 use crate::utils::bls12_381 as bls;
1016 use std::net::Ipv4Addr;
1017
1018 let sk = bls::generate_sk();
1019 let pk = bls::get_public_key(&sk).expect("pk");
1020 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
1021
1022 let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
1023 let config = config::Config {
1024 work_folder: format!("/tmp/test_tasks_{}", unique_id),
1025 version: Ver::new(1, 2, 3),
1026 offline: false,
1027 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1028 http_port: 3000,
1029 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1030 udp_port: 36969,
1031 public_ipv4: Some("127.0.0.1".to_string()),
1032 seed_ips: Vec::new(),
1033 seed_anrs: Vec::new(),
1034 other_nodes: Vec::new(),
1035 trust_factor: 0.8,
1036 max_peers: 100,
1037 trainer_sk: sk,
1038 trainer_pk: pk,
1039 trainer_pk_b58: String::new(),
1040 trainer_pop: pop.to_vec(),
1041 archival_node: false,
1042 autoupdate: false,
1043 computor_type: None,
1044 snapshot_height: 0,
1045 anr: None,
1046 anr_desc: None,
1047 anr_name: None,
1048 };
1049
1050 let socket = Arc::new(MockSocket::new());
1051 let metrics = metrics::Metrics::new();
1052 let node_peers = peers::NodePeers::default();
1053 let node_anrs = crate::node::anr::NodeAnrs::new();
1054 let reassembler = node::ReedSolomonReassembler::new();
1055
1056 let fabric = crate::consensus::fabric::Fabric::new(&config.get_root()).await.unwrap();
1057 let ctx = Context { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket };
1058
1059 let snapshot = ctx.get_metrics_snapshot();
1061 assert_eq!(snapshot.tasks, 0);
1062
1063 ctx.inc_tasks();
1064 ctx.inc_tasks();
1065 let snapshot = ctx.get_metrics_snapshot();
1066 assert_eq!(snapshot.tasks, 2);
1067
1068 ctx.dec_tasks();
1069 let snapshot = ctx.get_metrics_snapshot();
1070 assert_eq!(snapshot.tasks, 1);
1071 }
1072
1073 #[tokio::test]
1074 async fn test_context_convenience_socket_functions() {
1075 use std::sync::Arc;
1077
1078 use crate::utils::bls12_381 as bls;
1079 use std::net::Ipv4Addr;
1080
1081 let sk = bls::generate_sk();
1082 let pk = bls::get_public_key(&sk).expect("pk");
1083 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
1084
1085 let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
1086 let config = config::Config {
1087 work_folder: format!("/tmp/test_convenience_{}", unique_id),
1088 version: Ver::new(1, 2, 3),
1089 offline: false,
1090 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1091 http_port: 3000,
1092 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1093 udp_port: 36969,
1094 public_ipv4: Some("127.0.0.1".to_string()),
1095 seed_ips: Vec::new(),
1096 seed_anrs: Vec::new(),
1097 other_nodes: Vec::new(),
1098 trust_factor: 0.8,
1099 max_peers: 100,
1100 trainer_sk: sk,
1101 trainer_pk: pk,
1102 trainer_pk_b58: String::new(),
1103 trainer_pop: pop.to_vec(),
1104 archival_node: false,
1105 autoupdate: false,
1106 computor_type: None,
1107 snapshot_height: 0,
1108 anr: None,
1109 anr_desc: None,
1110 anr_name: None,
1111 };
1112 let socket = Arc::new(MockSocket::new());
1113
1114 match Context::with_config_and_socket(config, socket).await {
1115 Ok(context) => {
1116 let mut buf = [0u8; 1024];
1117 let target: Ipv4Addr = "127.0.0.1".parse().unwrap();
1118
1119 let pong = Protocol::PingReply(PingReply { ts_m: 1234567890, seen_time: 1234567890123 });
1120 match context.send_message_to(&pong, target).await {
1122 Ok(_) => {
1123 }
1125 Err(_) => {
1126 }
1128 }
1129
1130 match context.recv_from(&mut buf).await {
1132 Ok(_) => {
1133 }
1135 Err(_) => {
1136 }
1138 }
1139 }
1140 Err(_) => {
1141 }
1143 }
1144 }
1145}