1use crate::config::{Config, SeedANR};
2use crate::utils::blake3;
3use crate::utils::bls12_381::{sign, verify};
4use crate::utils::misc::get_unix_secs_now;
5use crate::utils::version::Ver;
6use crate::utils::{Hash, PublicKey};
7use amadeus_utils::B3f4;
8use amadeus_utils::vecpak;
9use once_cell::sync::Lazy;
10use std::collections::HashMap;
11use std::net::Ipv4Addr;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14const UDP_PACKETS_LIMIT: u64 = 40_000;
15
16pub static PROTO_RATE_LIMITS: Lazy<HashMap<&'static str, u64>> = Lazy::new(|| {
17 use crate::consensus::doms::attestation::EventAttestation;
18 use crate::consensus::doms::entry::Entry;
19 use crate::node::protocol::{
20 Catchup, CatchupReply, EventTip, EventTx, GetPeerAnrs, GetPeerAnrsReply, NewPhoneWhoDis, NewPhoneWhoDisReply,
21 Ping, PingReply,
22 };
23
24 [
25 (Ping::TYPENAME, 30),
26 (PingReply::TYPENAME, 30),
27 (EventTip::TYPENAME, 30),
28 (EventTx::TYPENAME, 8000),
29 (GetPeerAnrs::TYPENAME, 10),
30 (GetPeerAnrsReply::TYPENAME, 10),
31 (NewPhoneWhoDis::TYPENAME, 20),
32 (NewPhoneWhoDisReply::TYPENAME, 20),
33 (Catchup::TYPENAME, 20),
36 (CatchupReply::TYPENAME, 20),
37 (Entry::TYPENAME, 30),
38 (EventAttestation::TYPENAME, 8000),
39 ]
40 .into_iter()
41 .collect()
42});
43
44#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
45pub enum Error {
46 #[error("Invalid timestamp: ANR is from the future")]
47 InvalidTimestamp,
48 #[error("ANR too large: {0} bytes (max 390)")]
49 TooLarge(usize),
50 #[error("Invalid signature")]
51 InvalidSignature,
52 #[error("Invalid port: {0} (expected 36969)")]
53 InvalidPort(u16),
54 #[error(transparent)]
55 Bls(#[from] crate::utils::bls12_381::Error),
56 #[error("parse error: {0}")]
57 ParseError(&'static str),
58}
59
60impl crate::utils::misc::Typename for Error {
61 fn typename(&self) -> &'static str {
62 self.into()
63 }
64}
65
66#[derive(Debug, Clone, PartialEq, bincode::Encode, bincode::Decode)]
67#[allow(non_snake_case)]
68pub struct Anr {
69 pub ip4: Ipv4Addr,
70 pub pk: PublicKey,
71 pub pop: Vec<u8>,
72 pub port: u16,
73 pub signature: Vec<u8>,
74 pub ts: u32,
75 pub version: Ver,
76 pub anr_name: Option<String>,
77 pub anr_desc: Option<String>,
78 pub handshaked: bool,
80 #[allow(non_snake_case)]
81 pub hasChainPop: bool,
82 pub error: Option<String>,
83 pub error_tries: u32,
84 pub next_check: u32,
85 pub pk_b3: Hash,
87 pub pk_b3_f4: B3f4,
88 pub proto_reqs: HashMap<String, u64>,
89 pub udp_packets: u64,
90}
91
92impl serde::Serialize for Anr {
93 fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
94 where
95 S: serde::Serializer,
96 {
97 use serde::ser::SerializeMap;
98 let mut map = serializer.serialize_map(None)?;
99 map.serialize_entry("pk", &serde_bytes::Bytes::new(self.pk.as_ref()))?;
100 map.serialize_entry("ts", &self.ts)?;
101 map.serialize_entry("ip4", &self.ip4.to_string())?;
102 map.serialize_entry("pop", &serde_bytes::Bytes::new(self.pop.as_ref()))?;
103 map.serialize_entry("port", &self.port)?;
104 map.serialize_entry("version", &self.version.to_string())?;
105 map.serialize_entry("signature", &serde_bytes::Bytes::new(self.signature.as_ref()))?;
106 if let Some(ref name) = self.anr_name {
107 map.serialize_entry("anr_name", name)?;
108 }
109 if let Some(ref desc) = self.anr_desc {
110 map.serialize_entry("anr_desc", desc)?;
111 }
112 map.end()
113 }
114}
115
116impl<'de> serde::Deserialize<'de> for Anr {
117 fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
118 where
119 D: serde::Deserializer<'de>,
120 {
121 use serde::de::{MapAccess, Visitor};
122 use std::fmt;
123
124 struct AnrVisitor;
125
126 impl<'de> Visitor<'de> for AnrVisitor {
127 type Value = Anr;
128
129 fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
130 formatter.write_str("struct Anr")
131 }
132
133 fn visit_map<V>(self, mut map: V) -> Result<Anr, V::Error>
134 where
135 V: MapAccess<'de>,
136 {
137 let mut pk: Option<Vec<u8>> = None;
138 let mut ts = None;
139 let mut ip4_str: Option<String> = None;
140 let mut pop = None;
141 let mut port = None;
142 let mut version_str: Option<String> = None;
143 let mut signature = None;
144 let mut anr_name = None;
145 let mut anr_desc = None;
146
147 while let Some(key) = map.next_key::<String>()? {
148 match key.as_str() {
149 "pk" => {
150 pk = Some(map.next_value::<serde_bytes::ByteBuf>()?.into_vec());
151 }
152 "ts" => ts = Some(map.next_value()?),
153 "ip4" => ip4_str = Some(map.next_value()?),
154 "pop" => pop = Some(map.next_value::<serde_bytes::ByteBuf>()?.into_vec()),
155 "port" => port = Some(map.next_value()?),
156 "version" => version_str = Some(map.next_value()?),
157 "signature" => signature = Some(map.next_value::<serde_bytes::ByteBuf>()?.into_vec()),
158 "anr_name" => anr_name = Some(map.next_value()?),
159 "anr_desc" => anr_desc = Some(map.next_value()?),
160 _ => {
161 let _ = map.next_value::<serde::de::IgnoredAny>()?;
162 }
163 }
164 }
165
166 let pk_vec = pk.ok_or_else(|| serde::de::Error::missing_field("pk"))?;
167 let pk_array: PublicKey =
168 pk_vec.try_into().map_err(|_| serde::de::Error::custom("pk must be 48 bytes"))?;
169 let ts = ts.ok_or_else(|| serde::de::Error::missing_field("ts"))?;
170 let ip4_str = ip4_str.ok_or_else(|| serde::de::Error::missing_field("ip4"))?;
171 let ip4: Ipv4Addr = ip4_str.parse().map_err(|_| serde::de::Error::custom("invalid IPv4 address"))?;
172 let pop = pop.ok_or_else(|| serde::de::Error::missing_field("pop"))?;
173 let port = port.ok_or_else(|| serde::de::Error::missing_field("port"))?;
174 let version_str = version_str.ok_or_else(|| serde::de::Error::missing_field("version"))?;
175 let version =
176 Ver::try_from(version_str.as_str()).map_err(|_| serde::de::Error::custom("invalid version"))?;
177 let signature = signature.ok_or_else(|| serde::de::Error::missing_field("signature"))?;
178
179 let pk_b3 = Hash::from(blake3::hash(pk_array.as_ref()));
180 let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
181
182 Ok(Anr {
183 ip4,
184 pk: pk_array,
185 pop,
186 port,
187 signature,
188 ts,
189 version,
190 anr_name,
191 anr_desc,
192 handshaked: false,
193 hasChainPop: false,
194 error: None,
195 error_tries: 0,
196 next_check: 0,
197 pk_b3,
198 pk_b3_f4,
199 proto_reqs: HashMap::new(),
200 udp_packets: 0,
201 })
202 }
203 }
204
205 deserializer.deserialize_map(AnrVisitor)
206 }
207}
208
209impl From<SeedANR> for Anr {
210 fn from(seed: SeedANR) -> Self {
211 let pk_b3 = Hash::from(blake3::hash(seed.pk.as_ref()));
213 let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
214
215 Anr {
216 ip4: seed.ip4.parse().unwrap_or(Ipv4Addr::new(0, 0, 0, 0)),
217 pk: seed.pk,
218 pop: vec![0u8; 96], port: seed.port,
220 signature: seed.signature,
221 ts: seed.ts,
222 version: seed.version,
223 anr_name: None,
224 anr_desc: None,
225 handshaked: false,
226 hasChainPop: false,
227 error: None,
228 error_tries: 0,
229 next_check: seed.ts + 3,
230 pk_b3,
231 pk_b3_f4,
232 proto_reqs: HashMap::new(),
233 udp_packets: 0,
234 }
235 }
236}
237
238impl Anr {
239 pub fn from_config(config: &Config) -> Result<Self, Error> {
241 Self::build_with_name_desc(
242 &config.get_sk(),
243 &config.get_pk(),
244 &config.get_pop(),
245 config.get_public_ipv4(),
246 config.get_ver(),
247 config.anr_name.clone(),
248 config.anr_desc.clone(),
249 )
250 }
251
252 pub fn build(sk: &[u8], pk: &PublicKey, pop: &[u8], ip4: Ipv4Addr, version: Ver) -> Result<Self, Error> {
253 Self::build_with_name_desc(sk, pk, pop, ip4, version, None, None)
254 }
255
256 pub fn build_with_name_desc(
257 sk: &[u8],
258 pk: &PublicKey,
259 pop: &[u8],
260 ip4: Ipv4Addr,
261 version: Ver,
262 anr_name: Option<String>,
263 anr_desc: Option<String>,
264 ) -> Result<Self, Error> {
265 let ts_s = get_unix_secs_now();
266
267 let pk_b3 = Hash::from(blake3::hash(pk.as_ref()));
268 let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
269
270 let mut anr = Anr {
271 ip4,
272 pk: *pk,
273 pop: pop.to_vec(),
274 port: 36969,
275 ts: ts_s,
276 version,
277 anr_name,
278 anr_desc,
279 signature: vec![],
280 handshaked: false,
281 hasChainPop: false,
282 error: None,
283 error_tries: 0,
284 next_check: ts_s + 3,
285 pk_b3,
286 pk_b3_f4,
287 proto_reqs: HashMap::new(),
288 udp_packets: 0,
289 };
290
291 let to_sign = anr.to_vecpak_for_signing();
293 let dst = crate::consensus::DST_ANR;
294 let sig_array = sign(sk, &to_sign, dst)?;
295 anr.signature = sig_array.to_vec();
296
297 Ok(anr)
298 }
299
300 pub fn from_vecpak_map(map: amadeus_utils::vecpak::PropListMap) -> Result<Self, Error> {
302 let ip4_str = map.get_string(b"ip4").ok_or(Error::ParseError("ip4"))?;
304 let ip4 = ip4_str.parse::<Ipv4Addr>().map_err(|_| Error::ParseError("ip4_parse"))?;
305
306 let pk = map.get_binary::<PublicKey>(b"pk").ok_or(Error::ParseError("pk"))?;
307 let pop = map.get_binary::<Vec<u8>>(b"pop").ok_or(Error::ParseError("pop"))?;
308 let port = map.get_integer::<u16>(b"port").ok_or(Error::ParseError("port"))?;
309 let signature = map.get_binary::<Vec<u8>>(b"signature").ok_or(Error::ParseError("signature"))?;
310
311 let ts = map
313 .get_integer::<u32>(b"ts")
314 .or_else(|| map.get_integer::<u64>(b"ts").map(|v| v as u32))
315 .ok_or(Error::ParseError("ts"))?;
316
317 let version_bytes = map.get_binary::<Vec<u8>>(b"version").ok_or(Error::ParseError("version"))?;
318 let version_str = String::from_utf8_lossy(&version_bytes);
319 let version = Ver::try_from(version_str.as_ref()).map_err(|_| Error::ParseError("invalid_version_format"))?;
320
321 let anr_name = map
323 .get_binary::<Vec<u8>>(b"anr_name")
324 .and_then(|bytes| String::from_utf8(bytes).ok())
325 .filter(|s| !s.is_empty());
326
327 let anr_desc = map
328 .get_binary::<Vec<u8>>(b"anr_desc")
329 .and_then(|bytes| String::from_utf8(bytes).ok())
330 .filter(|s| !s.is_empty());
331
332 let pk_b3 = Hash::from(blake3::hash(pk.as_ref()));
334 let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
335
336 Ok(Self {
337 ip4,
338 pk,
339 pop,
340 port,
341 signature,
342 ts,
343 version,
344 anr_name,
345 anr_desc,
346 handshaked: false,
347 hasChainPop: false,
348 error: None,
349 error_tries: 0,
350 next_check: 0,
351 pk_b3,
352 pk_b3_f4,
353 proto_reqs: HashMap::new(),
354 udp_packets: 0,
355 })
356 }
357
358 pub fn verify_signature(&self) -> bool {
360 if verify(&*self.pk, &self.pop, &*self.pk, crate::consensus::DST_POP).is_err() {
363 return false;
364 }
365
366 let to_sign = self.to_vecpak_for_signing();
368 if verify(&*self.pk, &self.signature, &to_sign, crate::consensus::DST_ANR).is_err() {
369 return false;
370 }
371
372 true
373 }
374
375 pub fn verify_and_unpack(anr: Anr) -> Result<Anr, Error> {
377 let now_s = get_unix_secs_now();
378 if (anr.ts as i64) - (now_s as i64) > 3600 {
379 return Err(Error::InvalidTimestamp);
380 }
381
382 let packed_anr = anr.pack();
384 let serialized = amadeus_utils::vecpak::encode(packed_anr.to_vecpak_term());
385 if serialized.len() > 390 {
386 return Err(Error::TooLarge(serialized.len()));
387 }
388
389 if !anr.verify_signature() {
391 return Err(Error::InvalidSignature);
392 }
393
394 Ok(packed_anr)
395 }
396
397 pub fn to_vecpak_for_signing(&self) -> Vec<u8> {
398 use amadeus_utils::vecpak::encode;
400 let term = self.to_vecpak_term_without_signature();
401 encode(term)
402 }
403
404 pub fn to_vecpak_term(&self) -> vecpak::Term {
405 let mut pairs = self.to_vecpak_term_without_signature();
406 if let vecpak::Term::PropList(ref mut p) = pairs {
407 p.push((vecpak::Term::Binary(b"signature".to_vec()), vecpak::Term::Binary(self.signature.clone())));
408 }
409 pairs
410 }
411
412 fn to_vecpak_term_without_signature(&self) -> vecpak::Term {
413 let mut pairs = Vec::new();
414
415 if let Some(desc) = &self.anr_desc {
419 pairs.push((vecpak::Term::Binary(b"anr_desc".to_vec()), vecpak::Term::Binary(desc.as_bytes().to_vec())));
420 }
421
422 if let Some(name) = &self.anr_name {
423 pairs.push((vecpak::Term::Binary(b"anr_name".to_vec()), vecpak::Term::Binary(name.as_bytes().to_vec())));
424 }
425
426 pairs.push((
427 vecpak::Term::Binary(b"ip4".to_vec()),
428 vecpak::Term::Binary(self.ip4.to_string().as_bytes().to_vec()),
429 ));
430 pairs.push((vecpak::Term::Binary(b"pk".to_vec()), vecpak::Term::Binary(self.pk.to_vec())));
431 pairs.push((vecpak::Term::Binary(b"pop".to_vec()), vecpak::Term::Binary(self.pop.clone())));
432 pairs.push((vecpak::Term::Binary(b"port".to_vec()), vecpak::Term::VarInt(self.port as i128)));
433 pairs.push((vecpak::Term::Binary(b"ts".to_vec()), vecpak::Term::VarInt(self.ts as i128)));
434 pairs.push((
435 vecpak::Term::Binary(b"version".to_vec()),
436 vecpak::Term::Binary(self.version.to_string().as_bytes().to_vec()),
437 ));
438
439 vecpak::Term::PropList(pairs)
440 }
441
442 pub fn unpack(anr: Anr) -> Result<Anr, Error> {
444 if anr.port == 36969 {
445 let pk_b3 = Hash::from(blake3::hash(anr.pk.as_ref()));
447 let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
448
449 Ok(Anr {
450 ip4: anr.ip4,
451 pk: anr.pk,
452 pop: anr.pop,
453 port: anr.port,
454 signature: anr.signature,
455 ts: anr.ts,
456 version: anr.version,
457 anr_name: anr.anr_name,
458 anr_desc: anr.anr_desc,
459 handshaked: false,
460 hasChainPop: false,
461 pk_b3,
462 pk_b3_f4,
463 error: None,
464 error_tries: 0,
465 next_check: 0,
466 proto_reqs: HashMap::new(),
467 udp_packets: 0,
468 })
469 } else {
470 Err(Error::InvalidPort(anr.port))
471 }
472 }
473
474 pub fn pack(&self) -> Anr {
476 Anr {
477 ip4: self.ip4.clone(),
478 pk: self.pk,
479 pop: self.pop.clone(),
480 port: self.port,
481 signature: self.signature.clone(),
482 ts: self.ts,
483 version: self.version.clone(),
484 anr_name: self.anr_name.clone(),
485 anr_desc: self.anr_desc.clone(),
486 handshaked: false,
487 hasChainPop: false,
488 pk_b3: self.pk_b3,
489 pk_b3_f4: self.pk_b3_f4,
490 error: None,
491 error_tries: 0,
492 next_check: 0,
493 proto_reqs: HashMap::new(),
494 udp_packets: 0,
495 }
496 }
497}
498
499#[derive(Debug, Clone)]
502pub struct NodeAnrs {
503 store: Arc<RwLock<HashMap<PublicKey, Anr>>>,
504}
505
506impl NodeAnrs {
507 pub fn new() -> Self {
509 Self { store: Arc::new(RwLock::new(HashMap::new())) }
510 }
511
512 pub fn default() -> Self {
514 Self::new()
515 }
516
517 pub async fn insert(&self, mut anr: Anr) {
519 anr.hasChainPop = false; let pk = anr.pk;
524
525 let mut map = self.store.write().await;
527 map.entry(pk.clone())
528 .and_modify(|old| {
529 if anr.ts > old.ts {
531 let same_ip4_port = old.ip4 == anr.ip4 && old.port == anr.port;
533 if !same_ip4_port {
534 anr.handshaked = false;
536 anr.error = None;
537 anr.error_tries = 0;
538 anr.next_check = get_unix_secs_now() + 3;
539 } else {
540 anr.handshaked = old.handshaked;
542 }
543 *old = anr.clone();
544 }
545 })
546 .or_insert_with(|| {
547 anr.handshaked = false;
549 anr.error = None;
550 anr.error_tries = 0;
551 anr.next_check = get_unix_secs_now() + 3;
552 anr
553 });
554 }
555
556 pub async fn get(&self, pk: &[u8]) -> Option<Anr> {
558 let map = self.store.read().await;
559 map.get(pk).cloned()
560 }
561
562 pub async fn get_by_ip4(&self, ip4: Ipv4Addr) -> Option<Anr> {
564 let map = self.store.read().await;
565 map.values().find(|anr| anr.ip4 == ip4).cloned()
566 }
567
568 pub async fn get_all_b3f4(&self) -> Vec<B3f4> {
570 let map = self.store.read().await;
571 let anrs: Vec<B3f4> = map.values().cloned().map(|a| a.pk_b3_f4).collect();
572 anrs
573 }
574
575 pub async fn get_all(&self) -> Vec<Anr> {
577 let map = self.store.read().await;
578 let anrs: Vec<Anr> = map.values().cloned().collect();
579 anrs
580 }
581
582 pub async fn is_within_udp_limit(&self, ip4: Ipv4Addr) -> Option<bool> {
584 let mut map = self.store.write().await;
585 if let Some(anr) = map.values_mut().find(|anr| anr.ip4 == ip4) {
586 anr.udp_packets += 1;
587 return Some(anr.udp_packets < UDP_PACKETS_LIMIT);
588 }
589 None
590 }
591
592 pub async fn is_within_proto_limit(&self, pk: &[u8], typename: &str) -> Option<bool> {
594 let mut map = self.store.write().await;
595 if let Some(anr) = map.get_mut(pk)
596 && let Some(limit) = PROTO_RATE_LIMITS.get(typename)
597 {
598 let counter = anr.proto_reqs.entry(typename.to_string()).or_insert(0);
599 *counter += 1;
600 return Some(*counter < *limit);
601 }
602 None
603 }
604
605 pub async fn update_rate_limiting_counters(&self) {
607 let mut map = self.store.write().await;
608 for anr in map.values_mut() {
609 anr.udp_packets = anr.udp_packets.saturating_sub(UDP_PACKETS_LIMIT / 2);
610 for (typename, limit) in PROTO_RATE_LIMITS.iter() {
612 if let Some(counter) = anr.proto_reqs.get_mut(*typename) {
613 *counter = counter.saturating_sub(*limit / 2);
614 }
615 }
616 }
617 }
618
619 pub async fn unset_handshaked(&self, pk: &[u8]) {
621 let mut map = self.store.write().await;
622 if let Some(anr) = map.get_mut(pk) {
623 anr.handshaked = false;
624 }
625 }
626
627 pub async fn set_handshaked(&self, pk: &[u8]) {
629 let mut map = self.store.write().await;
630 if let Some(anr) = map.get_mut(pk) {
631 anr.handshaked = true;
632 }
633 }
634
635 pub async fn handshaked(&self) -> Vec<PublicKey> {
637 let map = self.store.read().await;
638 let mut pks = Vec::new();
639 for (k, v) in map.iter() {
640 if v.handshaked {
641 pks.push(k.clone());
642 }
643 }
644 pks
645 }
646
647 pub async fn get_all_not_handshaked_ip4(&self) -> Vec<Ipv4Addr> {
649 let map = self.store.read().await;
650 let mut results = Vec::new();
651
652 for (_, v) in map.iter() {
653 if !v.handshaked {
654 results.push(v.ip4);
655 }
656 }
657
658 results
659 }
660
661 pub async fn is_handshaked(&self, pk: &[u8]) -> bool {
663 let map = self.store.read().await;
664 map.get(pk).map(|v| v.handshaked).unwrap_or(false)
665 }
666
667 pub async fn handshaked_and_valid_ip4(&self, pk: &[u8], ip4: &Ipv4Addr) -> bool {
669 let map = self.store.read().await;
670 map.get(pk).map(|v| v.handshaked && v.ip4 == *ip4).unwrap_or(false)
671 }
672
673 pub async fn get_random_verified(&self, count: usize) -> Vec<Ipv4Addr> {
675 use rand::seq::IndexedRandom;
676 use std::collections::HashSet;
677
678 let mut seen_ips = HashSet::new();
680 let mut unique_pairs = Vec::new();
681 for ip4 in self.get_all_handshaked_ip4().await {
682 if seen_ips.insert(ip4) {
683 unique_pairs.push(ip4);
684 }
685 }
686
687 let mut rng = rand::rng();
688 let selected: Vec<_> = unique_pairs.choose_multiple(&mut rng, count).cloned().collect();
689
690 selected
691 }
692
693 pub async fn get_random_not_verified(&self, count: usize) -> Vec<Ipv4Addr> {
695 use rand::seq::IndexedRandom;
696 use std::collections::HashSet;
697
698 let mut seen_ips = HashSet::new();
700 let mut unique_pairs = Vec::new();
701 for ip4 in self.get_all_not_handshaked_ip4().await {
702 if seen_ips.insert(ip4) {
703 unique_pairs.push(ip4);
704 }
705 }
706
707 let mut rng = rand::rng();
708 let selected: Vec<_> = unique_pairs.choose_multiple(&mut rng, count).cloned().collect();
709
710 selected
711 }
712
713 pub async fn get_all_excluding_b3f4(&self, b3f4: &[B3f4]) -> Vec<Anr> {
715 let map = self.store.read().await;
716 let mut results = Vec::new();
717 for (_, v) in map.iter() {
718 if !b3f4.contains(&v.pk_b3_f4) {
719 results.push(v.clone());
720 }
721 }
722 results
723 }
724
725 pub async fn get_all_handshaked_ip4(&self) -> Vec<Ipv4Addr> {
727 let map = self.store.read().await;
728 let mut results = Vec::new();
729
730 for (_, v) in map.iter() {
731 if v.handshaked {
732 results.push(v.ip4);
733 }
734 }
735
736 results
737 }
738
739 pub async fn by_pks_ip<T: AsRef<[u8]>>(&self, pks: &[T]) -> Vec<Ipv4Addr> {
741 let pk_set: std::collections::HashSet<PublicKey> = pks
743 .iter()
744 .filter_map(|p| {
745 let bytes = p.as_ref();
746 if bytes.len() == 48 {
747 let mut array = [0u8; 48];
748 array.copy_from_slice(bytes);
749 Some(PublicKey::from(array))
750 } else {
751 None
752 }
753 })
754 .collect();
755 let mut ips = Vec::new();
756
757 let map = self.store.read().await;
758 for v in map.values() {
759 if pk_set.contains(&v.pk) {
760 ips.push(v.ip4);
761 }
762 }
763
764 ips
765 }
766
767 pub async fn seed(&self, config: &Config) {
769 for anr in config.seed_anrs.iter().cloned().map(Into::<Anr>::into) {
770 self.insert(anr).await;
771 }
772
773 if let Ok(my_anr) = Anr::from_config(config) {
774 self.insert(my_anr).await;
775 self.set_handshaked(config.get_pk().as_ref()).await;
776 }
777 }
778}
779
780#[cfg(test)]
781mod tests {
782 use super::*;
783 use amadeus_utils::bls12_381;
784
785 impl NodeAnrs {
786 pub async fn clear_all(&self) {
787 self.store.write().await.clear()
788 }
789
790 pub async fn count(&self) -> usize {
791 self.store.read().await.len()
792 }
793 }
794
795 #[tokio::test]
796 async fn test_anr_operations() {
797 let registry = NodeAnrs::new();
798
799 let _sk = [1; 32];
801 let mut pk = [2; 48];
802 let pid_bytes = std::process::id().to_le_bytes();
803 let now_ns_bytes = crate::utils::misc::get_unix_nanos_now().to_le_bytes();
804 pk[..4].copy_from_slice(&pid_bytes);
805 pk[4..12].copy_from_slice(&now_ns_bytes[..8]);
806
807 let pop = vec![3; 96];
808 let ip4 = Ipv4Addr::new(127, 0, 0, 1);
809 let version = Ver::new(1, 0, 0);
810
811 let pk_b3 = blake3::hash(&pk);
813 let pk_b3_f4 = B3f4::new(&pk_b3);
814
815 let pk_wrapped = PublicKey::new(pk);
816 let anr = Anr {
817 ip4,
818 pk: pk_wrapped,
819 pop,
820 port: 36969,
821 signature: vec![0; 96],
822 ts: 1234567890,
823 version,
824 anr_name: None,
825 anr_desc: None,
826 handshaked: false,
827 hasChainPop: false,
828 error: None,
829 error_tries: 0,
830 next_check: 1234567893,
831 pk_b3: Hash::new(pk_b3.into()),
832 pk_b3_f4,
833 proto_reqs: HashMap::new(),
834 udp_packets: 0,
835 };
836
837 registry.insert(anr.clone()).await;
839
840 let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
842 assert_eq!(&*retrieved.pk, &pk);
843 assert!(!retrieved.handshaked, "Expected handshaked to be false after insert, got true");
844
845 registry.set_handshaked(pk_wrapped.as_ref()).await;
847 let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
848 assert!(retrieved.handshaked, "Expected handshaked to be true after set_handshaked");
849
850 let handshaked_pks = registry.handshaked().await;
852 assert!(handshaked_pks.iter().any(|p| &**p == &pk), "pk should be in handshaked list");
853
854 assert!(registry.is_handshaked(pk_wrapped.as_ref()).await, "is_handshaked should return true");
856
857 let all = registry.get_all().await;
859 assert!(!all.is_empty());
860 assert!(all.iter().any(|a| &*a.pk == &pk));
861
862 let total_count = registry.count().await;
864 assert!(total_count >= 1, "Expected at least 1 ANR, got {}", total_count);
865
866 registry.clear_all().await;
868
869 assert!(registry.get(pk_wrapped.as_ref()).await.is_none(), "Our pk should be removed");
871 }
872
873 #[tokio::test]
874 async fn test_anr_update() {
875 let registry = NodeAnrs::new();
876
877 let mut pk = [1; 48];
879 let pid_bytes = std::process::id().to_le_bytes();
880 let now_ns_bytes = crate::utils::misc::get_unix_nanos_now().to_le_bytes();
881 pk[..4].copy_from_slice(&pid_bytes);
882 pk[4..12].copy_from_slice(&now_ns_bytes[..8]);
883 let pop = vec![2; 96];
884 let ip4 = Ipv4Addr::new(192, 168, 1, 1);
885 let version = Ver::new(1, 0, 0);
886
887 let pk_b3 = blake3::hash(&pk);
889 let pk_b3_f4 = B3f4::new(&pk_b3);
890
891 let pk_wrapped = PublicKey::new(pk);
892 let anr1 = Anr {
894 ip4,
895 pk: pk_wrapped,
896 pop: pop.clone(),
897 port: 36969,
898 signature: vec![0; 96],
899 ts: 1000,
900 version: version.clone(),
901 anr_name: None,
902 anr_desc: None,
903 handshaked: true,
904 hasChainPop: false,
905 error: None,
906 error_tries: 0,
907 next_check: 1003,
908 pk_b3: Hash::new(pk_b3.into()),
909 pk_b3_f4,
910 proto_reqs: HashMap::new(),
911 udp_packets: 0,
912 };
913 registry.insert(anr1).await;
914 registry.set_handshaked(pk_wrapped.as_ref()).await;
915
916 let anr2 = Anr {
918 ip4: Ipv4Addr::new(10, 0, 0, 1),
919 pk: pk_wrapped,
920 pop: pop.clone(),
921 port: 36969,
922 signature: vec![0; 96],
923 ts: 999,
924 version: version.clone(),
925 anr_name: None,
926 anr_desc: None,
927 handshaked: false,
928 hasChainPop: false,
929 error: None,
930 error_tries: 0,
931 next_check: 1002,
932 pk_b3: Hash::new(pk_b3.into()),
933 pk_b3_f4,
934 proto_reqs: HashMap::new(),
935 udp_packets: 0,
936 };
937 registry.insert(anr2).await;
938
939 let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
941 assert_eq!(retrieved.ip4, Ipv4Addr::new(192, 168, 1, 1));
942 assert_eq!(retrieved.ts, 1000);
943 assert!(retrieved.handshaked);
944
945 let anr3 = Anr {
947 ip4,
948 pk: pk_wrapped,
949 pop: pop.clone(),
950 port: 36969,
951 signature: vec![0; 96],
952 ts: 2000,
953 version: Ver::new(2, 0, 0),
954 anr_name: None,
955 anr_desc: None,
956 handshaked: false,
957 hasChainPop: false,
958 error: None,
959 error_tries: 0,
960 next_check: 2003,
961 pk_b3: Hash::new(pk_b3.into()),
962 pk_b3_f4,
963 proto_reqs: HashMap::new(),
964 udp_packets: 0,
965 };
966 registry.insert(anr3).await;
967
968 let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
969 assert_eq!(retrieved.ts, 2000);
970 assert_eq!(retrieved.version, Ver::new(2, 0, 0));
971 assert!(retrieved.handshaked); let anr4 = Anr {
975 ip4: Ipv4Addr::new(10, 0, 0, 1),
976 pk: pk_wrapped,
977 pop,
978 port: 36969,
979 signature: vec![0; 96],
980 ts: 3000,
981 version: Ver::new(3, 0, 0),
982 anr_name: None,
983 anr_desc: None,
984 handshaked: true,
985 hasChainPop: false,
986 error: Some("old error".to_string()),
987 error_tries: 5,
988 next_check: 3003,
989 pk_b3: Hash::new(pk_b3.into()),
990 pk_b3_f4,
991 proto_reqs: HashMap::new(),
992 udp_packets: 0,
993 };
994 registry.insert(anr4).await;
995
996 let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
997 assert_eq!(retrieved.ts, 3000);
998 assert_eq!(retrieved.ip4, Ipv4Addr::new(10, 0, 0, 1));
999 assert!(!retrieved.handshaked); assert_eq!(retrieved.error, None); assert_eq!(retrieved.error_tries, 0); registry.clear_all().await;
1005 }
1006
1007 #[tokio::test]
1008 async fn test_get_random_not_handshaked_multiple() {
1009 let registry = NodeAnrs::new();
1010
1011 for i in 1..=5 {
1013 let mut pk = [i as u8; 48];
1014 let now_ns_bytes = crate::utils::misc::get_unix_nanos_now().to_le_bytes();
1015 pk[40..48].copy_from_slice(&now_ns_bytes[..8]);
1016
1017 let pk_b3 = blake3::hash(&pk);
1019 let pk_b3_f4 = B3f4::new(&pk_b3);
1020
1021 let anr = Anr {
1022 ip4: Ipv4Addr::new(192, 168, 1, i), pk: PublicKey::new(pk),
1024 pop: vec![i as u8; 96],
1025 port: 36969,
1026 signature: vec![i as u8; 96],
1027 ts: 1000 + i as u32,
1028 version: Ver::new(1, 0, i as u8),
1029 anr_name: None,
1030 anr_desc: None,
1031 handshaked: false, hasChainPop: false,
1033 error: None,
1034 error_tries: 0,
1035 next_check: 2000,
1036 pk_b3: Hash::new(pk_b3.into()),
1037 pk_b3_f4,
1038 proto_reqs: HashMap::new(),
1039 udp_packets: 0,
1040 };
1041
1042 registry.insert(anr).await;
1043 }
1044
1045 for run in 1..=10 {
1047 let result = registry.get_random_not_verified(3).await;
1048
1049 assert_eq!(result.len(), 3, "Run {}: expected 3 results, got {}", run, result.len());
1051
1052 let mut ips = std::collections::HashSet::new();
1054 for ip in &result {
1055 assert!(ips.insert(*ip), "Run {}: duplicate IP found: {}", run, ip);
1056 }
1057 }
1058
1059 let result = registry.get_random_not_verified(10).await;
1061 assert_eq!(result.len(), 5, "Should return all 5 when asking for 10");
1062
1063 registry.clear_all().await;
1065 }
1066
1067 #[test]
1068 fn test_anr_vecpak_signature() {
1069 let sk = bls12_381::generate_sk();
1071 let pk = bls12_381::get_public_key(&sk).expect("get pk");
1072 let pop = bls12_381::sign(&sk, pk.as_ref(), crate::consensus::DST_POP).expect("sign pop");
1073
1074 let anr =
1076 Anr::build(&sk, &pk, pop.as_ref(), Ipv4Addr::new(127, 0, 0, 1), Ver::new(1, 2, 5)).expect("build anr");
1077
1078 assert!(anr.verify_signature(), "ANR signature should verify");
1080
1081 let vecpak_data = anr.to_vecpak_for_signing();
1083 println!("Vecpak encoded ANR (hex): {}", hex::encode(&vecpak_data));
1084 println!("Vecpak size: {} bytes", vecpak_data.len());
1085
1086 assert!(vecpak_data[0] <= 7, "Should be vecpak format, not ETF");
1089 }
1090
1091 #[test]
1092 fn test_anr_signature_compatibility() {
1093 let sk = bls12_381::generate_sk();
1097 let pk = bls12_381::get_public_key(&sk).expect("get pk");
1098 let pop = bls12_381::sign(&sk, pk.as_ref(), crate::consensus::DST_POP).expect("sign pop");
1099
1100 let anr =
1101 Anr::build(&sk, &pk, pop.as_ref(), Ipv4Addr::new(192, 168, 1, 1), Ver::new(1, 2, 3)).expect("build anr");
1102
1103 let to_sign = anr.to_vecpak_for_signing();
1105
1106 let decoded = vecpak::decode(&to_sign).expect("decode vecpak");
1108
1109 println!("Decoded vecpak term: {:?}", decoded);
1110
1111 match decoded {
1113 vecpak::Term::PropList(pairs) => {
1114 println!("ANR has {} fields", pairs.len());
1115 for (key, _value) in &pairs {
1116 if let vecpak::Term::Binary(key_bytes) = key {
1117 let key_str = String::from_utf8_lossy(key_bytes);
1118 println!("Field: {}", key_str);
1119 }
1120 }
1121 }
1122 _ => panic!("Expected PropList for ANR"),
1123 }
1124 }
1125}