1use crate::node::anr::{Anr, NodeAnrs};
2use crate::node::peers::{HandshakeStatus, PeersSummary};
3use crate::node::protocol::*;
4use crate::node::protocol::{Catchup, CatchupHeight, Instruction, NewPhoneWhoDis, NewPhoneWhoDisReply, Typename};
5use crate::node::{anr, peers};
6use crate::socket::UdpSocketExt;
7use crate::utils::misc::format_duration;
8use crate::utils::{Hash, PublicKey};
9use crate::{SystemStats, Ver, config, consensus, get_system_stats, metrics, node, utils};
10use amadeus_utils::vecpak;
11use serde::{Deserialize, Serialize};
12use serde_json::Value;
13use std::net::{Ipv4Addr, SocketAddr};
14use std::sync::Arc;
15use tracing::{debug, info, instrument, warn};
16
17#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
19#[serde(rename_all = "lowercase")]
20pub enum SoftforkStatus {
21 #[serde(rename = "")]
23 Healthy,
24 Minor,
26 Major,
28}
29
30#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
31pub enum Error {
32 #[error(transparent)]
33 Io(#[from] std::io::Error),
34 #[error(transparent)]
35 Fabric(#[from] consensus::fabric::Error),
36 #[error(transparent)]
37 Archiver(#[from] utils::archiver::Error),
38 #[error(transparent)]
39 Protocol(#[from] node::protocol::Error),
40 #[error(transparent)]
41 Config(#[from] config::Error),
42 #[error(transparent)]
43 Anr(#[from] anr::Error),
44 #[error(transparent)]
45 Peers(#[from] peers::Error),
46 #[error("other error: {0}")]
47 Other(String),
48}
49
50impl Typename for Error {
51 fn typename(&self) -> &'static str {
52 self.into()
53 }
54}
55
56pub struct Context {
57 pub(crate) config: config::Config,
58 pub(crate) metrics: metrics::Metrics,
59 pub(crate) reassembler: node::ReedSolomonReassembler,
60 pub(crate) peers: peers::NodePeers,
61 pub(crate) anrs: NodeAnrs,
62 pub(crate) fabric: crate::consensus::fabric::Fabric,
63 pub(crate) socket: Arc<dyn UdpSocketExt>,
64}
65
66impl Context {
67 pub async fn with_config_and_socket(
68 config: config::Config,
69 socket: Arc<dyn UdpSocketExt>,
70 ) -> Result<Arc<Self>, Error> {
71 use crate::config::{
72 ANR_PERIOD_MILLIS, BROADCAST_PERIOD_MILLIS, CATCHUP_PERIOD_MILLIS, CLEANUP_PERIOD_MILLIS,
73 CONSENSUS_PERIOD_MILLIS,
74 };
75 use crate::consensus::fabric::Fabric;
76 use crate::utils::archiver::init_storage;
77 use metrics::Metrics;
78 use node::ReedSolomonReassembler;
79 use tokio::time::{Duration, interval};
80
81 assert_ne!(config.get_root(), "");
82 init_storage(&config.get_root()).await?;
83
84 let fabric = Fabric::new(&config.get_root()).await?;
85 let metrics = Metrics::new();
86 let node_peers = peers::NodePeers::default();
87 let node_anrs = NodeAnrs::new();
88 let reassembler = ReedSolomonReassembler::new();
89
90 node_anrs.seed(&config).await; node_peers.seed(&fabric, &config, &node_anrs).await?;
92
93 let ctx = Arc::new(Self { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket });
94
95 tokio::spawn({
96 let ctx = ctx.clone();
97 async move {
98 if let Err(e) = ctx.bootstrap_task().await {
99 warn!("bootstrap task error: {e}");
100 }
102 }
103 });
104
105 tokio::spawn({
106 let ctx = ctx.clone();
107 async move {
108 let mut ticker = interval(Duration::from_millis(CLEANUP_PERIOD_MILLIS));
109 loop {
110 ticker.tick().await;
111 ctx.cleanup_task().await;
112 }
113 }
114 });
115
116 tokio::spawn({
117 let ctx = ctx.clone();
118 async move {
119 let mut ticker = interval(Duration::from_millis(ANR_PERIOD_MILLIS));
120 ticker.tick().await;
121 loop {
122 ticker.tick().await;
123 if let Err(e) = ctx.anr_task().await {
124 warn!("anr task error: {e}");
125 }
127 }
128 }
129 });
130
131 tokio::spawn({
132 let ctx = ctx.clone();
133 async move {
134 let mut ticker = interval(Duration::from_millis(BROADCAST_PERIOD_MILLIS));
135 loop {
136 ticker.tick().await;
137 if let Err(e) = ctx.broadcast_task().await {
138 warn!("broadcast task error: {e}");
140 }
142 }
143 }
144 });
145
146 tokio::spawn({
147 let ctx = ctx.clone();
148 async move {
149 let mut ticker = interval(Duration::from_millis(CONSENSUS_PERIOD_MILLIS));
150 loop {
151 ticker.tick().await;
152 if let Err(e) = ctx.consensus_task().await {
153 warn!("consensus task error: {e}");
154 }
155 }
156 }
157 });
158
159 tokio::spawn({
160 let ctx = ctx.clone();
161 async move {
162 let mut is_syncing = false;
163 loop {
164 let tick_ms = if is_syncing { 30 } else { CATCHUP_PERIOD_MILLIS };
165 tokio::time::sleep(Duration::from_millis(tick_ms)).await;
166
167 match ctx.catchup_task().await {
168 Ok(syncing) => is_syncing = syncing,
169 Err(e) => warn!("catchup task error: {e}"),
170 }
171 }
172 }
173 });
174
175 tokio::spawn({
176 let ctx = ctx.clone();
177 async move {
178 let mut ticker = interval(Duration::from_millis(600_000));
179 loop {
180 ticker.tick().await;
181 if let Err(e) = ctx.autoupdate_task().await {
182 warn!("autoupdate task error: {e}");
183 }
185 }
186 }
187 });
188
189 Ok(ctx)
190 }
191
192 #[instrument(skip(self), name = "bootstrap_task")]
193 async fn bootstrap_task(&self) -> Result<(), Error> {
194 let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
195
196 for ip in &self.config.seed_ips {
197 match new_phone_who_dis.send_to_with_metrics(self, *ip).await {
199 Ok(_) => {
200 debug!("sent encrypted new_phone_who_dis to seed {ip}");
201 }
202 Err(e) => {
203 warn!("failed to send encrypted new_phone_who_dis to seed {ip}: {e}");
205 }
206 }
207 self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
209 }
210
211 info!("sent new_phone_who_dis to {} seed nodes", self.config.seed_ips.len());
212 Ok(())
213 }
214
215 #[instrument(skip(self), name = "cleanup_task")]
216 async fn cleanup_task(&self) {
217 self.anrs.update_rate_limiting_counters().await;
218 let cleared_shards = self.reassembler.clear_stale().await;
219 let cleared_peers = self.peers.clear_stale(&self.fabric, &self.anrs).await;
220 if cleared_shards > 0 || cleared_peers > 0 {
221 debug!("cleared {} stale shards, {} stale peers", cleared_shards, cleared_peers);
222 }
223 }
224
225 #[instrument(skip(self), name = "anr_task")]
226 async fn anr_task(&self) -> Result<(), Error> {
227 let unverified_ips = self.anrs.get_random_not_verified(3).await;
228 if !unverified_ips.is_empty() {
229 let new_phone_who_dis = Protocol::NewPhoneWhoDis(NewPhoneWhoDis::new());
230 for ip in &unverified_ips {
231 new_phone_who_dis.send_to_with_metrics(self, *ip).await?;
232 self.peers.set_handshake_status(*ip, HandshakeStatus::Initiated).await?;
233 }
234 }
235
236 let verified_ips = self.anrs.get_random_verified(3).await;
237 if !verified_ips.is_empty() {
238 let get_peer_anrs = Protocol::GetPeerAnrs(GetPeerAnrs::new(self.anrs.get_all_b3f4().await));
239 for ip in &verified_ips {
240 self.send_message_to(&get_peer_anrs, *ip).await?;
241 }
242 }
243
244 info!("sent new_phone_who_dis to {} and get_peer_anrs to {} nodes", unverified_ips.len(), verified_ips.len());
245
246 Ok(())
247 }
248
249 #[instrument(skip(self), name = "broadcast_task")]
250 async fn broadcast_task(&self) -> Result<(), Error> {
251 let ping = Protocol::Ping(Ping::new());
252 let tip = Protocol::EventTip(EventTip::from_current_tips_db(&self.fabric)?);
253
254 let my_ip = self.config.get_public_ipv4();
255 let peers = self.peers.get_all().await?;
256 if !peers.is_empty() {
257 let mut sent_count = 0;
258 for peer in peers {
259 if peer.ip != my_ip {
260 self.send_message_to(&ping, peer.ip).await?;
261 self.send_message_to(&tip, peer.ip).await?;
262 sent_count += 1;
263 }
264 }
265
266 debug!("sent {sent_count} ping and tip messages");
267 }
268
269 Ok(())
270 }
271
272 #[instrument(skip(self), name = "autoupdate_task")]
273 async fn autoupdate_task(&self) -> Result<(), Error> {
274 Ok(())
277 }
278
279 #[instrument(skip(self), name = "consensus_task")]
280 async fn consensus_task(&self) -> Result<(), Error> {
281 use consensus::consensus::{proc_consensus, proc_entries};
282 self.fabric.start_proc_consensus();
283 if let Err(e) = proc_entries(&self.fabric, &self.config, self).await {
284 warn!("proc_entries failed: {e}");
285 }
286
287 if let Err(e) = proc_consensus(&self.fabric) {
288 warn!("proc_consensus failed: {e}");
289 }
290 self.fabric.stop_proc_consensus();
291 Ok(())
292 }
293
294 #[instrument(skip(self), name = "catchup_task")]
295 async fn catchup_task(&self) -> Result<bool, Error> {
296 if self.fabric.is_proc_consensus() {
297 return Ok(true);
298 }
299
300 let temporal_height = match self.fabric.get_temporal_height() {
301 Ok(Some(h)) => h,
302 Ok(None) => 0,
303 Err(e) => return Err(e.into()),
304 };
305 let rooted_height = self.fabric.get_rooted_height()?.unwrap_or_default();
306 info!("Temporal: {} Rooted: {}", temporal_height, rooted_height);
307
308 let trainer_pks = self.fabric.trainers_for_height(temporal_height).unwrap_or_default();
309 let (peers_temporal, peers_rooted, peers_bft) = self.peers.get_heights(&trainer_pks).await?;
310
311 let behind_temporal = peers_temporal.saturating_sub(temporal_height);
312 let behind_rooted = peers_rooted.saturating_sub(rooted_height);
313 let behind_bft = peers_bft.saturating_sub(temporal_height);
314 let rooting_stuck = temporal_height.saturating_sub(rooted_height) > 1000;
315
316 if rooting_stuck {
317 warn!(
318 "Stopped syncing: getting {} consensuses starting {}",
319 temporal_height - rooted_height,
320 rooted_height + 1
321 );
322 let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(temporal_height, &trainer_pks).await?;
323 let heights: Vec<u64> = (rooted_height + 1..=temporal_height).take(200).collect();
324 let chunks: Vec<Vec<CatchupHeight>> = heights
325 .into_iter()
326 .map(|height| CatchupHeight { height, c: Some(true), e: None, a: None, hashes: None })
327 .collect::<Vec<_>>()
328 .chunks(20)
329 .map(|chunk| chunk.to_vec())
330 .collect();
331 self.fetch_heights(chunks, online_trainer_ips).await?;
332 return Ok(false);
333 }
334
335 if behind_bft > 0 {
336 info!("Behind BFT: Syncing {} entries", behind_bft);
337 let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_bft, &trainer_pks).await?;
338 let heights: Vec<u64> = (rooted_height + 1..=peers_bft).take(200).collect();
339 let chunks: Vec<Vec<CatchupHeight>> = heights
340 .into_iter()
341 .map(|height| CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: None })
342 .collect::<Vec<_>>()
343 .chunks(20)
344 .map(|chunk| chunk.to_vec())
345 .collect();
346 self.fetch_heights(chunks, online_trainer_ips).await?;
347 return Ok(false);
348 }
349
350 if behind_rooted > 0 {
351 info!("Behind rooted: Syncing {} entries", behind_rooted);
352 let online_trainer_ips = self.peers.get_trainer_ips_above_rooted(peers_rooted, &trainer_pks).await?;
353 let heights: Vec<u64> = (rooted_height + 1..=peers_rooted).take(200).collect();
354 let chunks: Vec<Vec<CatchupHeight>> = heights
355 .into_iter()
356 .map(|height| {
357 let entries = self.fabric.entries_by_height(height).unwrap_or_default();
358 let hashes = entries;
359 CatchupHeight { height, c: Some(true), e: Some(true), a: None, hashes: Some(hashes) }
360 })
361 .collect::<Vec<_>>()
362 .chunks(20)
363 .map(|chunk| chunk.to_vec())
364 .collect();
365 self.fetch_heights(chunks, online_trainer_ips).await?;
366 return Ok(false);
367 }
368
369 if behind_temporal > 0 {
370 info!("Behind temporal: Syncing {} entries", behind_temporal);
371 let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
372 let heights: Vec<u64> = (temporal_height..=peers_temporal).take(200).collect();
373 let chunks: Vec<Vec<CatchupHeight>> = heights
374 .into_iter()
375 .map(|height| {
376 let entries = self.fabric.entries_by_height(height).unwrap_or_default();
377 let hashes = entries;
378 CatchupHeight { height, c: None, e: Some(true), a: Some(true), hashes: Some(hashes) }
379 })
380 .collect::<Vec<_>>()
381 .chunks(20)
382 .map(|chunk| chunk.to_vec())
383 .collect();
384 self.fetch_heights(chunks, online_trainer_ips).await?;
385 return Ok(false);
386 }
387
388 if behind_temporal == 0 {
389 info!("In sync: Fetching attestations for last entry");
390 let online_trainer_ips = self.peers.get_trainer_ips_above_temporal(peers_temporal, &trainer_pks).await?;
391 let entries = self.fabric.entries_by_height(temporal_height).unwrap_or_default();
392 let hashes = entries;
393 let chunk = vec![CatchupHeight {
394 height: temporal_height,
395 c: None,
396 e: Some(true),
397 a: Some(true),
398 hashes: Some(hashes),
399 }];
400 self.fetch_heights(vec![chunk], online_trainer_ips).await?;
401 }
402
403 Ok(behind_temporal > 0 || behind_rooted > 0 || behind_bft > 0 || rooting_stuck)
404 }
405
406 async fn fetch_heights(
407 &self,
408 chunks: Vec<Vec<CatchupHeight>>,
409 peers: Vec<std::net::Ipv4Addr>,
410 ) -> Result<(), Error> {
411 use rand::seq::SliceRandom;
412 let mut shuffled_peers = peers;
413 {
414 let mut rng = rand::rng();
415 shuffled_peers.shuffle(&mut rng);
416 }
417
418 for (chunk, peer_ip) in chunks.into_iter().zip(shuffled_peers.into_iter().cycle()) {
419 Protocol::Catchup(Catchup { height_flags: chunk }).send_to_with_metrics(self, peer_ip).await?;
420 }
421 Ok(())
422 }
423
424 pub fn get_prometheus_metrics(&self) -> String {
425 self.metrics.get_prometheus()
426 }
427
428 pub fn get_json_health(&self) -> Value {
429 serde_json::json!({
430 "status": "ok",
431 "version": env!("CARGO_PKG_VERSION"),
432 "uptime": self.metrics.get_uptime()
433 })
434 }
435
436 pub async fn send_message_to(&self, message: &Protocol, dst: Ipv4Addr) -> Result<(), Error> {
438 message.send_to_with_metrics(self, dst).await.map_err(Into::into)
439 }
440
441 pub async fn recv_from(&self, buf: &mut [u8]) -> std::io::Result<(usize, SocketAddr)> {
443 self.socket.recv_from_with_metrics(buf, &self.metrics).await
444 }
445
446 pub async fn is_peer_handshaked(&self, ip: Ipv4Addr) -> bool {
447 if let Some(peer) = self.peers.by_ip(ip).await {
448 if let Some(ref pk) = peer.pk {
449 if self.anrs.is_handshaked(pk.as_ref()).await {
450 return true;
451 }
452 }
453 }
454 false
455 }
456
457 pub async fn get_peers_summary(&self) -> Result<PeersSummary, Error> {
458 let my_ip = self.config.get_public_ipv4();
459 let temporal_height = self.get_temporal_height();
460 let trainer_pks = self.fabric.trainers_for_height(temporal_height + 1).unwrap_or_default();
461 self.peers.get_peers_summary(my_ip, &trainer_pks).await.map_err(Into::into)
462 }
463
464 pub fn get_softfork_status(&self) -> SoftforkStatus {
465 let temporal_height = self.get_temporal_height();
466 let rooted_height = self.get_rooted_height();
467 let gap = temporal_height.saturating_sub(rooted_height);
468
469 match gap {
470 0 | 1 => SoftforkStatus::Healthy,
471 2..=10 => SoftforkStatus::Minor,
472 _ => SoftforkStatus::Major,
473 }
474 }
475
476 pub fn get_config(&self) -> &config::Config {
477 &self.config
478 }
479
480 pub fn get_socket(&self) -> Arc<dyn UdpSocketExt> {
481 self.socket.clone()
482 }
483
484 pub fn get_metrics(&self) -> &metrics::Metrics {
485 &self.metrics
486 }
487
488 pub fn get_metrics_snapshot(&self) -> metrics::MetricsSnapshot {
489 self.metrics.get_snapshot()
490 }
491
492 pub fn get_system_stats(&self) -> SystemStats {
493 get_system_stats()
494 }
495
496 pub fn get_uptime(&self) -> String {
497 format_duration(self.metrics.get_uptime())
498 }
499
500 pub fn inc_tasks(&self) {
502 self.metrics.inc_tasks();
503 }
504
505 pub fn dec_tasks(&self) {
507 self.metrics.dec_tasks();
508 }
509
510 pub fn get_temporal_height(&self) -> u64 {
512 self.fabric.get_temporal_height().ok().flatten().unwrap_or_default() as u64
513 }
514
515 pub fn get_rooted_height(&self) -> u64 {
517 self.fabric.get_rooted_height().ok().flatten().unwrap_or_default() as u64
518 }
519
520 pub async fn get_entries(&self) -> Vec<(u64, u64, u64)> {
521 tokio::task::spawn_blocking(|| {
523 tokio::runtime::Handle::current().block_on(async {
524 consensus::doms::entry::get_archived_entries().await.unwrap_or_else(|_| {
525 vec![
527 (201, 20100123, 1024), (201, 20100456, 2048),
529 (202, 20200789, 1536),
530 (202, 20201012, 3072),
531 (203, 20300345, 2560),
532 ]
533 })
534 })
535 })
536 .await
537 .unwrap_or_else(|_| {
538 vec![
540 (201, 20100123, 1024),
541 (201, 20100456, 2048),
542 (202, 20200789, 1536),
543 (202, 20201012, 3072),
544 (203, 20300345, 2560),
545 ]
546 })
547 }
548
549 pub async fn set_peer_handshake_status(&self, ip: Ipv4Addr, status: HandshakeStatus) -> Result<(), peers::Error> {
551 self.peers.set_handshake_status(ip, status).await
552 }
553
554 pub async fn update_peer_from_anr(
556 &self,
557 ip: Ipv4Addr,
558 pk: &PublicKey,
559 version: &Ver,
560 status: Option<HandshakeStatus>,
561 ) {
562 self.peers.update_peer_from_anr(ip, pk, version, status).await
563 }
564
565 pub async fn get_all_anrs(&self) -> Vec<anr::Anr> {
567 self.anrs.get_all().await
568 }
569
570 pub async fn get_anr_by_pk_b58(&self, pk_b58: &str) -> Option<anr::Anr> {
572 if let Ok(pk_bytes) = bs58::decode(pk_b58).into_vec() {
573 let pk_array: Result<[u8; 48], _> = pk_bytes.try_into();
574 if let Ok(pk_array) = pk_array {
575 let pk = PublicKey::from(pk_array);
576 return self.get_anr_by_pk(&pk).await;
577 }
578 }
579 None
580 }
581
582 pub async fn get_anr_by_pk(&self, pk: &PublicKey) -> Option<anr::Anr> {
584 let all_anrs = self.anrs.get_all().await;
585 all_anrs.into_iter().find(|anr| &anr.pk == pk)
586 }
587
588 pub async fn get_validator_anrs(&self) -> Vec<anr::Anr> {
590 let all_anrs = self.anrs.get_all().await;
591 all_anrs.into_iter().filter(|anr| anr.handshaked).collect()
592 }
593
594 pub fn get_entries_by_height(&self, height: u64) -> Result<Vec<consensus::doms::entry::Entry>, Error> {
596 let entries_raw = self.fabric.entries_by_height(height)?;
597 entries_raw
598 .into_iter()
599 .map(|raw| {
600 consensus::doms::entry::Entry::from_vecpak_bin(&raw)
601 .map_err(|_| Error::Other("entry not vecpak in fabric".into()))
602 })
603 .collect()
604 }
605
606 pub fn get_temporal_entry(&self) -> Result<Option<consensus::doms::entry::Entry>, Error> {
608 self.fabric.get_temporal_entry().map_err(Into::into)
609 }
610
611 pub fn get_trainers_for_height(&self, height: u64) -> Option<Vec<PublicKey>> {
613 self.fabric.trainers_for_height(height)
614 }
615
616 pub fn get_public_key(&self) -> PublicKey {
618 self.config.get_pk()
619 }
620
621 pub fn get_wallet_balance(&self, public_key: &PublicKey, symbol: &[u8]) -> i128 {
623 self.fabric.chain_balance_symbol(public_key.as_ref(), symbol)
624 }
625
626 pub fn get_contract_state(&self, contract: &PublicKey, key: &[u8]) -> Option<Vec<u8>> {
628 use amadeus_utils::constants::CF_CONTRACTSTATE;
629 let full_key = [b"bic:contract:" as &[u8], contract.as_ref(), b":" as &[u8], key].concat();
630 self.fabric.db().get(CF_CONTRACTSTATE, &full_key).ok().flatten()
631 }
632
633 pub fn get_chain_diff_bits(&self) -> u32 {
635 self.fabric.chain_diff_bits() as u32
636 }
637
638 pub fn get_chain_total_sols(&self) -> i128 {
640 self.fabric.chain_total_sols() as i128
641 }
642
643 pub fn get_all_wallet_balances(&self, public_key: &PublicKey) -> Vec<(Vec<u8>, i128)> {
645 use amadeus_utils::constants::CF_CONTRACTSTATE;
646 let prefix = [b"account:" as &[u8], public_key.as_ref() as &[u8], b":balance:" as &[u8]].concat();
647 self.fabric
648 .db()
649 .iter_prefix(CF_CONTRACTSTATE, &prefix)
650 .unwrap_or_default()
651 .into_iter()
652 .filter_map(|(key, value)| {
653 if key.len() <= prefix.len() {
654 return None;
655 }
656 let symbol = key[prefix.len()..].to_vec();
657 let amount = std::str::from_utf8(&value).ok()?.parse::<i128>().ok()?;
658 if amount > 0 { Some((symbol, amount)) } else { None }
659 })
660 .collect()
661 }
662
663 pub fn get_entry_by_hash(&self, hash: &Hash) -> Option<consensus::doms::entry::Entry> {
665 self.fabric.get_entry_by_hash(hash)
666 }
667
668 pub fn db_get(&self, cf: &str, key: &[u8]) -> Result<Option<Vec<u8>>, Error> {
670 self.fabric.db().get(cf, key).map_err(|e| Error::Other(e.to_string()))
671 }
672
673 pub fn db_iter_prefix(&self, cf: &str, prefix: &[u8]) -> Result<Vec<(Vec<u8>, Vec<u8>)>, Error> {
675 self.fabric.db().iter_prefix(cf, prefix).map_err(|e| Error::Other(e.to_string()))
676 }
677
678 pub async fn parse_udp(&self, buf: &[u8], src: Ipv4Addr) -> Option<Protocol> {
681 self.metrics.add_incoming_udp_packet(buf.len());
682
683 if !self.anrs.is_within_udp_limit(src).await? {
684 return None; }
686
687 match self.reassembler.add_shard(buf, &self.config.get_sk()).await {
689 Ok(Some((packet, pk))) => {
690 match vecpak::from_slice::<Protocol>(&packet) {
691 Ok(proto) => {
692 self.peers.update_peer_from_proto(src, proto.typename()).await;
693 let is_allowed =
694 matches!(proto, Protocol::NewPhoneWhoDis(_) | Protocol::NewPhoneWhoDisReply(_))
695 || self.anrs.handshaked_and_valid_ip4(pk.as_ref(), &src).await;
696
697 if !is_allowed {
698 self.anrs.unset_handshaked(pk.as_ref()).await;
699 warn!("handshake needed {src}");
700 return None; }
702
703 if !self.anrs.is_within_proto_limit(pk.as_ref(), proto.typename()).await? {
704 return None; }
706
707 self.metrics.add_incoming_proto(proto.typename());
708 return Some(proto);
709 }
710 Err(e) => warn!("parse error: {e} {}", hex::encode(packet)),
711 }
712 }
713 Ok(None) => {} Err(e) => warn!("bad udp frame from {src} - {e}"),
715 }
716
717 None
718 }
719
720 pub async fn handle(&self, message: Protocol, src: Ipv4Addr) -> Result<Vec<Instruction>, Error> {
721 self.metrics.add_incoming_proto(message.typename());
722 message.handle(self, src).await.map_err(|e| {
723 warn!("can't handle {}: {e}", message.typename());
724 e.into()
725 })
726 }
727
728 pub async fn execute(&self, instruction: Instruction) -> Result<(), Error> {
729 let name = instruction.typename();
730 self.execute_inner(instruction).await.inspect_err(|e| warn!("can't execute {name}: {e}"))
731 }
732
733 pub async fn execute_inner(&self, instruction: Instruction) -> Result<(), Error> {
735 match instruction {
736 Instruction::Noop { why } => {
737 debug!("noop: {why}");
738 }
739
740 Instruction::SendNewPhoneWhoDisReply { dst } => {
741 let anr = Anr::from_config(&self.config)?;
742 let reply = Protocol::NewPhoneWhoDisReply(NewPhoneWhoDisReply::new(anr));
743 self.send_message_to(&reply, dst).await?;
744 }
745
746 Instruction::SendGetPeerAnrsReply { dst, anrs } => {
747 let peers_v2 = Protocol::GetPeerAnrsReply(GetPeerAnrsReply::new(anrs));
748 self.send_message_to(&peers_v2, dst).await?;
749 }
750
751 Instruction::SendPingReply { ts_m, dst } => {
752 let pong = Protocol::PingReply(PingReply::new(ts_m));
753 self.send_message_to(&pong, dst).await?;
754 }
755
756 Instruction::ValidTxs { txs } => {
757 info!("received {} valid transactions", txs.len());
759 }
761
762 Instruction::ReceivedEntry { entry: _ } => {
763 }
765
766 Instruction::ReceivedAttestation { attestation } => {
767 info!("received attestation for entry {:?}", &attestation.entry_hash[..8]);
769 debug!("Attestation handling not fully implemented yet");
774 }
775
776 Instruction::ReceivedConsensus { consensus } => {
777 let _ = self.fabric.insert_consensus(&consensus);
778 }
779
780 Instruction::SpecialBusiness { business: _ } => {
781 info!("received special business");
783 }
789
790 Instruction::SpecialBusinessReply { business: _ } => {
791 info!("received special business reply");
793 }
799
800 Instruction::SolicitEntry { hash: _ } => {
801 info!("received solicit entry request");
803 }
809
810 Instruction::SolicitEntry2 => {
811 info!("received solicit entry2 request");
813 }
819 };
820
821 Ok(())
822 }
823}
824
825#[cfg(test)]
826mod tests {
827 use super::*;
828 use crate::socket::MockSocket;
829
830 #[tokio::test(flavor = "multi_thread")]
831 async fn tokio_rwlock_allows_concurrent_reads() {
832 let lock = tokio::sync::RwLock::new(0usize);
834
835 let r1 = lock.read().await;
837 assert_eq!(*r1, 0);
838 let r2 = lock.try_read();
840 assert!(r2.is_ok(), "try_read should succeed when another reader holds the lock");
841 drop(r2);
843 drop(r1);
844
845 let mut w = lock.write().await;
847 *w += 1;
848 assert_eq!(*w, 1);
849 }
850
851 #[tokio::test]
852 async fn test_anr_verification_request_creation() {
853 use crate::utils::bls12_381 as bls;
855 use std::net::Ipv4Addr;
856
857 let sk = bls::generate_sk();
859 let pk = bls::get_public_key(&sk).expect("pk");
860 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
861
862 let config = config::Config {
863 work_folder: "/tmp/test".to_string(),
864 version: Ver::new(1, 2, 3),
865 offline: false,
866 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
867 http_port: 3000,
868 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
869 udp_port: 36969,
870 public_ipv4: Some("127.0.0.1".to_string()),
871 seed_ips: Vec::new(),
872 seed_anrs: Vec::new(),
873 other_nodes: Vec::new(),
874 trust_factor: 0.8,
875 max_peers: 100,
876 trainer_sk: sk,
877 trainer_pk: pk,
878 trainer_pk_b58: String::new(),
879 trainer_pop: pop.to_vec(),
880 archival_node: false,
881 autoupdate: false,
882 computor_type: None,
883 snapshot_height: 0,
884 anr: None,
885 anr_desc: None,
886 anr_name: None,
887 };
888
889 let target_ip = Ipv4Addr::new(127, 0, 0, 1);
890
891 let my_anr =
893 Anr::build(&config.trainer_sk, &config.trainer_pk, &config.trainer_pop, target_ip, Ver::new(1, 0, 0));
894 assert!(my_anr.is_ok());
895 }
896
897 #[tokio::test]
898 async fn test_get_random_unverified_anrs() {
899 let registry = NodeAnrs::new();
901 let result = registry.get_random_not_verified(3).await;
902
903 assert!(result.len() <= 3);
906 }
907
908 #[tokio::test]
909 async fn test_cleanup_stale_manual_trigger() {
910 use crate::utils::bls12_381 as bls;
912 use std::net::Ipv4Addr;
913
914 let sk = bls::generate_sk();
916 let pk = bls::get_public_key(&sk).expect("pk");
917 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
918
919 let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
921 let config = config::Config {
922 work_folder: format!("/tmp/test_cleanup_{}", unique_id),
923 version: Ver::new(1, 2, 3),
924 offline: false,
925 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
926 http_port: 3000,
927 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
928 udp_port: 36969,
929 public_ipv4: Some("127.0.0.1".to_string()),
930 seed_ips: vec!["127.0.0.1".parse().unwrap()],
931 seed_anrs: Vec::new(),
932 other_nodes: Vec::new(),
933 trust_factor: 0.8,
934 max_peers: 100,
935 trainer_sk: sk,
936 trainer_pk: pk,
937 trainer_pk_b58: String::new(),
938 trainer_pop: pop.to_vec(),
939 archival_node: false,
940 autoupdate: false,
941 computor_type: None,
942 snapshot_height: 0,
943 anr: None,
944 anr_desc: None,
945 anr_name: None,
946 };
947
948 let socket = Arc::new(MockSocket::new());
950 match Context::with_config_and_socket(config, socket).await {
951 Ok(ctx) => {
952 ctx.cleanup_task().await;
954 }
955 Err(_) => {
956 }
959 }
960 }
961
962 #[tokio::test]
963 async fn test_bootstrap_handshake_manual_trigger() {
964 use crate::utils::bls12_381 as bls;
966 use std::net::Ipv4Addr;
967
968 let sk = bls::generate_sk();
970 let pk = bls::get_public_key(&sk).expect("pk");
971 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
972
973 let work_folder = format!("/tmp/test_bootstrap_{}", std::process::id());
974 let config = config::Config {
975 work_folder,
976 version: Ver::new(1, 2, 3),
977 offline: false,
978 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
979 http_port: 3000,
980 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
981 udp_port: 36969,
982 public_ipv4: Some("127.0.0.1".to_string()),
983 seed_ips: vec!["127.0.0.1".parse().unwrap()], seed_anrs: Vec::new(),
985 other_nodes: Vec::new(),
986 trust_factor: 0.8,
987 max_peers: 100,
988 trainer_sk: sk,
989 trainer_pk: pk,
990 trainer_pk_b58: String::new(),
991 trainer_pop: pop.to_vec(),
992 archival_node: false,
993 autoupdate: false,
994 computor_type: None,
995 snapshot_height: 0,
996 anr: None,
997 anr_desc: None,
998 anr_name: None,
999 };
1000
1001 let socket = Arc::new(MockSocket::new());
1003 let ctx = Context::with_config_and_socket(config, socket).await.expect("context creation");
1004
1005 match ctx.bootstrap_task().await {
1007 Ok(()) => {
1008 }
1010 Err(_e) => {
1011 }
1014 }
1015 }
1016
1017 #[tokio::test]
1018 async fn test_context_task_tracking() {
1019 use crate::utils::bls12_381 as bls;
1021 use std::net::Ipv4Addr;
1022
1023 let sk = bls::generate_sk();
1024 let pk = bls::get_public_key(&sk).expect("pk");
1025 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
1026
1027 let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
1028 let config = config::Config {
1029 work_folder: format!("/tmp/test_tasks_{}", unique_id),
1030 version: Ver::new(1, 2, 3),
1031 offline: false,
1032 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1033 http_port: 3000,
1034 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1035 udp_port: 36969,
1036 public_ipv4: Some("127.0.0.1".to_string()),
1037 seed_ips: Vec::new(),
1038 seed_anrs: Vec::new(),
1039 other_nodes: Vec::new(),
1040 trust_factor: 0.8,
1041 max_peers: 100,
1042 trainer_sk: sk,
1043 trainer_pk: pk,
1044 trainer_pk_b58: String::new(),
1045 trainer_pop: pop.to_vec(),
1046 archival_node: false,
1047 autoupdate: false,
1048 computor_type: None,
1049 snapshot_height: 0,
1050 anr: None,
1051 anr_desc: None,
1052 anr_name: None,
1053 };
1054
1055 let socket = Arc::new(MockSocket::new());
1056 let metrics = metrics::Metrics::new();
1057 let node_peers = peers::NodePeers::default();
1058 let node_anrs = crate::node::anr::NodeAnrs::new();
1059 let reassembler = node::ReedSolomonReassembler::new();
1060
1061 let fabric = crate::consensus::fabric::Fabric::new(&config.get_root()).await.unwrap();
1062 let ctx = Context { config, metrics, reassembler, peers: node_peers, anrs: node_anrs, fabric, socket };
1063
1064 let snapshot = ctx.get_metrics_snapshot();
1066 assert_eq!(snapshot.tasks, 0);
1067
1068 ctx.inc_tasks();
1069 ctx.inc_tasks();
1070 let snapshot = ctx.get_metrics_snapshot();
1071 assert_eq!(snapshot.tasks, 2);
1072
1073 ctx.dec_tasks();
1074 let snapshot = ctx.get_metrics_snapshot();
1075 assert_eq!(snapshot.tasks, 1);
1076 }
1077
1078 #[tokio::test]
1079 async fn test_context_convenience_socket_functions() {
1080 use std::sync::Arc;
1082
1083 use crate::utils::bls12_381 as bls;
1084 use std::net::Ipv4Addr;
1085
1086 let sk = bls::generate_sk();
1087 let pk = bls::get_public_key(&sk).expect("pk");
1088 let pop = bls::sign(&sk, &*pk, consensus::DST_POP).expect("pop");
1089
1090 let unique_id = format!("{}_{}", std::process::id(), utils::misc::get_unix_nanos_now());
1091 let config = config::Config {
1092 work_folder: format!("/tmp/test_convenience_{}", unique_id),
1093 version: Ver::new(1, 2, 3),
1094 offline: false,
1095 http_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1096 http_port: 3000,
1097 udp_ipv4: Ipv4Addr::new(127, 0, 0, 1),
1098 udp_port: 36969,
1099 public_ipv4: Some("127.0.0.1".to_string()),
1100 seed_ips: Vec::new(),
1101 seed_anrs: Vec::new(),
1102 other_nodes: Vec::new(),
1103 trust_factor: 0.8,
1104 max_peers: 100,
1105 trainer_sk: sk,
1106 trainer_pk: pk,
1107 trainer_pk_b58: String::new(),
1108 trainer_pop: pop.to_vec(),
1109 archival_node: false,
1110 autoupdate: false,
1111 computor_type: None,
1112 snapshot_height: 0,
1113 anr: None,
1114 anr_desc: None,
1115 anr_name: None,
1116 };
1117 let socket = Arc::new(MockSocket::new());
1118
1119 match Context::with_config_and_socket(config, socket).await {
1120 Ok(context) => {
1121 let mut buf = [0u8; 1024];
1122 let target: Ipv4Addr = "127.0.0.1".parse().unwrap();
1123
1124 let pong = Protocol::PingReply(PingReply { ts_m: 1234567890, seen_time: 1234567890123 });
1125 match context.send_message_to(&pong, target).await {
1127 Ok(_) => {
1128 }
1130 Err(_) => {
1131 }
1133 }
1134
1135 match context.recv_from(&mut buf).await {
1137 Ok(_) => {
1138 }
1140 Err(_) => {
1141 }
1143 }
1144 }
1145 Err(_) => {
1146 }
1148 }
1149 }
1150}