Skip to main content

amadeus_node/node/
anr.rs

1use crate::config::{Config, SeedANR};
2use crate::utils::blake3;
3use crate::utils::bls12_381::{sign, verify};
4use crate::utils::misc::get_unix_secs_now;
5use crate::utils::version::Ver;
6use crate::utils::{Hash, PublicKey};
7use amadeus_utils::B3f4;
8use amadeus_utils::vecpak;
9use once_cell::sync::Lazy;
10use std::collections::HashMap;
11use std::net::Ipv4Addr;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14const UDP_PACKETS_LIMIT: u64 = 40_000;
15
16pub static PROTO_RATE_LIMITS: Lazy<HashMap<&'static str, u64>> = Lazy::new(|| {
17    use crate::consensus::doms::attestation::EventAttestation;
18    use crate::consensus::doms::entry::Entry;
19    use crate::node::protocol::{
20        Catchup, CatchupReply, EventTip, EventTx, GetPeerAnrs, GetPeerAnrsReply, NewPhoneWhoDis, NewPhoneWhoDisReply,
21        Ping, PingReply,
22    };
23
24    [
25        (Ping::TYPENAME, 30),
26        (PingReply::TYPENAME, 30),
27        (EventTip::TYPENAME, 30),
28        (EventTx::TYPENAME, 8000),
29        (GetPeerAnrs::TYPENAME, 10),
30        (GetPeerAnrsReply::TYPENAME, 10),
31        (NewPhoneWhoDis::TYPENAME, 20),
32        (NewPhoneWhoDisReply::TYPENAME, 20),
33        // (SpecialBusiness::TYPENAME, 200),
34        // (SpecialBusinessReply::TYPENAME, 200),
35        (Catchup::TYPENAME, 20),
36        (CatchupReply::TYPENAME, 20),
37        (Entry::TYPENAME, 30),
38        (EventAttestation::TYPENAME, 8000),
39    ]
40    .into_iter()
41    .collect()
42});
43
44#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
45pub enum Error {
46    #[error("Invalid timestamp: ANR is from the future")]
47    InvalidTimestamp,
48    #[error("ANR too large: {0} bytes (max 390)")]
49    TooLarge(usize),
50    #[error("Invalid signature")]
51    InvalidSignature,
52    #[error("Invalid port: {0} (expected 36969)")]
53    InvalidPort(u16),
54    #[error(transparent)]
55    Bls(#[from] crate::utils::bls12_381::Error),
56    #[error("parse error: {0}")]
57    ParseError(&'static str),
58}
59
60impl crate::utils::misc::Typename for Error {
61    fn typename(&self) -> &'static str {
62        self.into()
63    }
64}
65
66#[derive(Debug, Clone, PartialEq, bincode::Encode, bincode::Decode)]
67#[allow(non_snake_case)]
68pub struct Anr {
69    pub ip4: Ipv4Addr,
70    pub pk: PublicKey,
71    pub pop: Vec<u8>,
72    pub port: u16,
73    pub signature: Vec<u8>,
74    pub ts: u32,
75    pub version: Ver,
76    pub anr_name: Option<String>,
77    pub anr_desc: Option<String>,
78    // runtime fields
79    pub handshaked: bool,
80    #[allow(non_snake_case)]
81    pub hasChainPop: bool,
82    pub error: Option<String>,
83    pub error_tries: u32,
84    pub next_check: u32,
85    // Blake3 indexing fields (added in v1.1.8)
86    pub pk_b3: Hash,
87    pub pk_b3_f4: B3f4,
88    pub proto_reqs: HashMap<String, u64>,
89    pub udp_packets: u64,
90}
91
92impl serde::Serialize for Anr {
93    fn serialize<S>(&self, serializer: S) -> Result<S::Ok, S::Error>
94    where
95        S: serde::Serializer,
96    {
97        use serde::ser::SerializeMap;
98        let mut map = serializer.serialize_map(None)?;
99        map.serialize_entry("pk", &serde_bytes::Bytes::new(self.pk.as_ref()))?;
100        map.serialize_entry("ts", &self.ts)?;
101        map.serialize_entry("ip4", &self.ip4.to_string())?;
102        map.serialize_entry("pop", &serde_bytes::Bytes::new(self.pop.as_ref()))?;
103        map.serialize_entry("port", &self.port)?;
104        map.serialize_entry("version", &self.version.to_string())?;
105        map.serialize_entry("signature", &serde_bytes::Bytes::new(self.signature.as_ref()))?;
106        if let Some(ref name) = self.anr_name {
107            map.serialize_entry("anr_name", name)?;
108        }
109        if let Some(ref desc) = self.anr_desc {
110            map.serialize_entry("anr_desc", desc)?;
111        }
112        map.end()
113    }
114}
115
116impl<'de> serde::Deserialize<'de> for Anr {
117    fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
118    where
119        D: serde::Deserializer<'de>,
120    {
121        use serde::de::{MapAccess, Visitor};
122        use std::fmt;
123
124        struct AnrVisitor;
125
126        impl<'de> Visitor<'de> for AnrVisitor {
127            type Value = Anr;
128
129            fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
130                formatter.write_str("struct Anr")
131            }
132
133            fn visit_map<V>(self, mut map: V) -> Result<Anr, V::Error>
134            where
135                V: MapAccess<'de>,
136            {
137                let mut pk: Option<Vec<u8>> = None;
138                let mut ts = None;
139                let mut ip4_str: Option<String> = None;
140                let mut pop = None;
141                let mut port = None;
142                let mut version_str: Option<String> = None;
143                let mut signature = None;
144                let mut anr_name = None;
145                let mut anr_desc = None;
146
147                while let Some(key) = map.next_key::<String>()? {
148                    match key.as_str() {
149                        "pk" => {
150                            pk = Some(map.next_value::<serde_bytes::ByteBuf>()?.into_vec());
151                        }
152                        "ts" => ts = Some(map.next_value()?),
153                        "ip4" => ip4_str = Some(map.next_value()?),
154                        "pop" => pop = Some(map.next_value::<serde_bytes::ByteBuf>()?.into_vec()),
155                        "port" => port = Some(map.next_value()?),
156                        "version" => version_str = Some(map.next_value()?),
157                        "signature" => signature = Some(map.next_value::<serde_bytes::ByteBuf>()?.into_vec()),
158                        "anr_name" => anr_name = Some(map.next_value()?),
159                        "anr_desc" => anr_desc = Some(map.next_value()?),
160                        _ => {
161                            let _ = map.next_value::<serde::de::IgnoredAny>()?;
162                        }
163                    }
164                }
165
166                let pk_vec = pk.ok_or_else(|| serde::de::Error::missing_field("pk"))?;
167                let pk_array: PublicKey =
168                    pk_vec.try_into().map_err(|_| serde::de::Error::custom("pk must be 48 bytes"))?;
169                let ts = ts.ok_or_else(|| serde::de::Error::missing_field("ts"))?;
170                let ip4_str = ip4_str.ok_or_else(|| serde::de::Error::missing_field("ip4"))?;
171                let ip4: Ipv4Addr = ip4_str.parse().map_err(|_| serde::de::Error::custom("invalid IPv4 address"))?;
172                let pop = pop.ok_or_else(|| serde::de::Error::missing_field("pop"))?;
173                let port = port.ok_or_else(|| serde::de::Error::missing_field("port"))?;
174                let version_str = version_str.ok_or_else(|| serde::de::Error::missing_field("version"))?;
175                let version =
176                    Ver::try_from(version_str.as_str()).map_err(|_| serde::de::Error::custom("invalid version"))?;
177                let signature = signature.ok_or_else(|| serde::de::Error::missing_field("signature"))?;
178
179                let pk_b3 = Hash::from(blake3::hash(pk_array.as_ref()));
180                let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
181
182                Ok(Anr {
183                    ip4,
184                    pk: pk_array,
185                    pop,
186                    port,
187                    signature,
188                    ts,
189                    version,
190                    anr_name,
191                    anr_desc,
192                    handshaked: false,
193                    hasChainPop: false,
194                    error: None,
195                    error_tries: 0,
196                    next_check: 0,
197                    pk_b3,
198                    pk_b3_f4,
199                    proto_reqs: HashMap::new(),
200                    udp_packets: 0,
201                })
202            }
203        }
204
205        deserializer.deserialize_map(AnrVisitor)
206    }
207}
208
209impl From<SeedANR> for Anr {
210    fn from(seed: SeedANR) -> Self {
211        // Compute Blake3 hash fields for indexing
212        let pk_b3 = Hash::from(blake3::hash(seed.pk.as_ref()));
213        let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
214
215        Anr {
216            ip4: seed.ip4.parse().unwrap_or(Ipv4Addr::new(0, 0, 0, 0)),
217            pk: seed.pk,
218            pop: vec![0u8; 96], // seed anrs don't include pop in config
219            port: seed.port,
220            signature: seed.signature,
221            ts: seed.ts,
222            version: seed.version,
223            anr_name: None,
224            anr_desc: None,
225            handshaked: false,
226            hasChainPop: false,
227            error: None,
228            error_tries: 0,
229            next_check: seed.ts + 3,
230            pk_b3,
231            pk_b3_f4,
232            proto_reqs: HashMap::new(),
233            udp_packets: 0,
234        }
235    }
236}
237
238impl Anr {
239    // build a new anr with signature
240    pub fn from_config(config: &Config) -> Result<Self, Error> {
241        Self::build_with_name_desc(
242            &config.get_sk(),
243            &config.get_pk(),
244            &config.get_pop(),
245            config.get_public_ipv4(),
246            config.get_ver(),
247            config.anr_name.clone(),
248            config.anr_desc.clone(),
249        )
250    }
251
252    pub fn build(sk: &[u8], pk: &PublicKey, pop: &[u8], ip4: Ipv4Addr, version: Ver) -> Result<Self, Error> {
253        Self::build_with_name_desc(sk, pk, pop, ip4, version, None, None)
254    }
255
256    pub fn build_with_name_desc(
257        sk: &[u8],
258        pk: &PublicKey,
259        pop: &[u8],
260        ip4: Ipv4Addr,
261        version: Ver,
262        anr_name: Option<String>,
263        anr_desc: Option<String>,
264    ) -> Result<Self, Error> {
265        let ts_s = get_unix_secs_now();
266
267        let pk_b3 = Hash::from(blake3::hash(pk.as_ref()));
268        let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
269
270        let mut anr = Anr {
271            ip4,
272            pk: *pk,
273            pop: pop.to_vec(),
274            port: 36969,
275            ts: ts_s,
276            version,
277            anr_name,
278            anr_desc,
279            signature: vec![],
280            handshaked: false,
281            hasChainPop: false,
282            error: None,
283            error_tries: 0,
284            next_check: ts_s + 3,
285            pk_b3,
286            pk_b3_f4,
287            proto_reqs: HashMap::new(),
288            udp_packets: 0,
289        };
290
291        // create signature over vecpak format (matching Elixir's RDB.vecpak_encode)
292        let to_sign = anr.to_vecpak_for_signing();
293        let dst = crate::consensus::DST_ANR;
294        let sig_array = sign(sk, &to_sign, dst)?;
295        anr.signature = sig_array.to_vec();
296
297        Ok(anr)
298    }
299
300    /// Parse ANR from vecpak PropListMap (primary format)
301    pub fn from_vecpak_map(map: amadeus_utils::vecpak::PropListMap) -> Result<Self, Error> {
302        // ip4 is stored as string in elixir format: "127.0.0.1"
303        let ip4_str = map.get_string(b"ip4").ok_or(Error::ParseError("ip4"))?;
304        let ip4 = ip4_str.parse::<Ipv4Addr>().map_err(|_| Error::ParseError("ip4_parse"))?;
305
306        let pk = map.get_binary::<PublicKey>(b"pk").ok_or(Error::ParseError("pk"))?;
307        let pop = map.get_binary::<Vec<u8>>(b"pop").ok_or(Error::ParseError("pop"))?;
308        let port = map.get_integer::<u16>(b"port").ok_or(Error::ParseError("port"))?;
309        let signature = map.get_binary::<Vec<u8>>(b"signature").ok_or(Error::ParseError("signature"))?;
310
311        // handle timestamp - try u32 first, fallback to u64 for compatibility
312        let ts = map
313            .get_integer::<u32>(b"ts")
314            .or_else(|| map.get_integer::<u64>(b"ts").map(|v| v as u32))
315            .ok_or(Error::ParseError("ts"))?;
316
317        let version_bytes = map.get_binary::<Vec<u8>>(b"version").ok_or(Error::ParseError("version"))?;
318        let version_str = String::from_utf8_lossy(&version_bytes);
319        let version = Ver::try_from(version_str.as_ref()).map_err(|_| Error::ParseError("invalid_version_format"))?;
320
321        // parse optional anr_name and anr_desc fields (they may be nil atom or binary)
322        let anr_name = map
323            .get_binary::<Vec<u8>>(b"anr_name")
324            .and_then(|bytes| String::from_utf8(bytes).ok())
325            .filter(|s| !s.is_empty());
326
327        let anr_desc = map
328            .get_binary::<Vec<u8>>(b"anr_desc")
329            .and_then(|bytes| String::from_utf8(bytes).ok())
330            .filter(|s| !s.is_empty());
331
332        // Compute Blake3 hash fields for indexing (v1.1.8 compatibility)
333        let pk_b3 = Hash::from(blake3::hash(pk.as_ref()));
334        let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
335
336        Ok(Self {
337            ip4,
338            pk,
339            pop,
340            port,
341            signature,
342            ts,
343            version,
344            anr_name,
345            anr_desc,
346            handshaked: false,
347            hasChainPop: false,
348            error: None,
349            error_tries: 0,
350            next_check: 0,
351            pk_b3,
352            pk_b3_f4,
353            proto_reqs: HashMap::new(),
354            udp_packets: 0,
355        })
356    }
357
358    // verify anr signature and proof of possession
359    pub fn verify_signature(&self) -> bool {
360        // verify proof of possession (pop is signature of pk with pk as key)
361        // this proves the sender owns the private key for pk
362        if verify(&*self.pk, &self.pop, &*self.pk, crate::consensus::DST_POP).is_err() {
363            return false;
364        }
365
366        // verify main signature using vecpak encoding (matching Elixir's RDB.vecpak_encode)
367        let to_sign = self.to_vecpak_for_signing();
368        if verify(&*self.pk, &self.signature, &to_sign, crate::consensus::DST_ANR).is_err() {
369            return false;
370        }
371
372        true
373    }
374
375    // verify and unpack anr from untrusted source
376    pub fn verify_and_unpack(anr: Anr) -> Result<Anr, Error> {
377        let now_s = get_unix_secs_now();
378        if (anr.ts as i64) - (now_s as i64) > 3600 {
379            return Err(Error::InvalidTimestamp);
380        }
381
382        // check size limit (390 bytes in elixir) using vecpak format
383        let packed_anr = anr.pack();
384        let serialized = amadeus_utils::vecpak::encode(packed_anr.to_vecpak_term());
385        if serialized.len() > 390 {
386            return Err(Error::TooLarge(serialized.len()));
387        }
388
389        // verify signature
390        if !anr.verify_signature() {
391            return Err(Error::InvalidSignature);
392        }
393
394        Ok(packed_anr)
395    }
396
397    pub fn to_vecpak_for_signing(&self) -> Vec<u8> {
398        // convert ANR to vecpak format for signing (matching Elixir's RDB.vecpak_encode)
399        use amadeus_utils::vecpak::encode;
400        let term = self.to_vecpak_term_without_signature();
401        encode(term)
402    }
403
404    pub fn to_vecpak_term(&self) -> vecpak::Term {
405        let mut pairs = self.to_vecpak_term_without_signature();
406        if let vecpak::Term::PropList(ref mut p) = pairs {
407            p.push((vecpak::Term::Binary(b"signature".to_vec()), vecpak::Term::Binary(self.signature.clone())));
408        }
409        pairs
410    }
411
412    fn to_vecpak_term_without_signature(&self) -> vecpak::Term {
413        let mut pairs = Vec::new();
414
415        // NOTE: anr_desc and anr_name are only included if they have a value.
416        // if they're None, the field is NOT added to the map. this matches
417        // elixir behavior where nil fields are not added during signing.
418        if let Some(desc) = &self.anr_desc {
419            pairs.push((vecpak::Term::Binary(b"anr_desc".to_vec()), vecpak::Term::Binary(desc.as_bytes().to_vec())));
420        }
421
422        if let Some(name) = &self.anr_name {
423            pairs.push((vecpak::Term::Binary(b"anr_name".to_vec()), vecpak::Term::Binary(name.as_bytes().to_vec())));
424        }
425
426        pairs.push((
427            vecpak::Term::Binary(b"ip4".to_vec()),
428            vecpak::Term::Binary(self.ip4.to_string().as_bytes().to_vec()),
429        ));
430        pairs.push((vecpak::Term::Binary(b"pk".to_vec()), vecpak::Term::Binary(self.pk.to_vec())));
431        pairs.push((vecpak::Term::Binary(b"pop".to_vec()), vecpak::Term::Binary(self.pop.clone())));
432        pairs.push((vecpak::Term::Binary(b"port".to_vec()), vecpak::Term::VarInt(self.port as i128)));
433        pairs.push((vecpak::Term::Binary(b"ts".to_vec()), vecpak::Term::VarInt(self.ts as i128)));
434        pairs.push((
435            vecpak::Term::Binary(b"version".to_vec()),
436            vecpak::Term::Binary(self.version.to_string().as_bytes().to_vec()),
437        ));
438
439        vecpak::Term::PropList(pairs)
440    }
441
442    // unpack anr with port validation like elixir
443    pub fn unpack(anr: Anr) -> Result<Anr, Error> {
444        if anr.port == 36969 {
445            // Compute Blake3 hash fields for compatibility
446            let pk_b3 = Hash::from(blake3::hash(anr.pk.as_ref()));
447            let pk_b3_f4 = B3f4::new(pk_b3.as_ref());
448
449            Ok(Anr {
450                ip4: anr.ip4,
451                pk: anr.pk,
452                pop: anr.pop,
453                port: anr.port,
454                signature: anr.signature,
455                ts: anr.ts,
456                version: anr.version,
457                anr_name: anr.anr_name,
458                anr_desc: anr.anr_desc,
459                handshaked: false,
460                hasChainPop: false,
461                pk_b3,
462                pk_b3_f4,
463                error: None,
464                error_tries: 0,
465                next_check: 0,
466                proto_reqs: HashMap::new(),
467                udp_packets: 0,
468            })
469        } else {
470            Err(Error::InvalidPort(anr.port))
471        }
472    }
473
474    // pack anr for network transmission
475    pub fn pack(&self) -> Anr {
476        Anr {
477            ip4: self.ip4.clone(),
478            pk: self.pk,
479            pop: self.pop.clone(),
480            port: self.port,
481            signature: self.signature.clone(),
482            ts: self.ts,
483            version: self.version.clone(),
484            anr_name: self.anr_name.clone(),
485            anr_desc: self.anr_desc.clone(),
486            handshaked: false,
487            hasChainPop: false,
488            pk_b3: self.pk_b3,
489            pk_b3_f4: self.pk_b3_f4,
490            error: None,
491            error_tries: 0,
492            next_check: 0,
493            proto_reqs: HashMap::new(),
494            udp_packets: 0,
495        }
496    }
497}
498
499/// NodeRegistry manages the network-wide identity verification system
500/// Tracks ANR (Amadeus Network Record) entries with cryptographic signatures
501#[derive(Debug, Clone)]
502pub struct NodeAnrs {
503    store: Arc<RwLock<HashMap<PublicKey, Anr>>>,
504}
505
506impl NodeAnrs {
507    /// Create a new NodeRegistry instance
508    pub fn new() -> Self {
509        Self { store: Arc::new(RwLock::new(HashMap::new())) }
510    }
511
512    /// Create with default configuration
513    pub fn default() -> Self {
514        Self::new()
515    }
516
517    /// Insert or update anr record
518    pub async fn insert(&self, mut anr: Anr) {
519        // check if we have chain pop for this pk (would need consensus module)
520        // let hasChainPop = consensus::chain_pop(&anr.pk).is_some();
521        anr.hasChainPop = false; // placeholder
522
523        let pk = anr.pk;
524
525        // check if anr already exists and update accordingly
526        let mut map = self.store.write().await;
527        map.entry(pk.clone())
528            .and_modify(|old| {
529                // only update if newer timestamp
530                if anr.ts > old.ts {
531                    // check if ip4/port changed
532                    let same_ip4_port = old.ip4 == anr.ip4 && old.port == anr.port;
533                    if !same_ip4_port {
534                        // reset handshake status
535                        anr.handshaked = false;
536                        anr.error = None;
537                        anr.error_tries = 0;
538                        anr.next_check = get_unix_secs_now() + 3;
539                    } else {
540                        // preserve handshake status
541                        anr.handshaked = old.handshaked;
542                    }
543                    *old = anr.clone();
544                }
545            })
546            .or_insert_with(|| {
547                // new anr
548                anr.handshaked = false;
549                anr.error = None;
550                anr.error_tries = 0;
551                anr.next_check = get_unix_secs_now() + 3;
552                anr
553            });
554    }
555
556    /// Get anr by public key
557    pub async fn get(&self, pk: &[u8]) -> Option<Anr> {
558        let map = self.store.read().await;
559        map.get(pk).cloned()
560    }
561
562    /// Get anr by IP address
563    pub async fn get_by_ip4(&self, ip4: Ipv4Addr) -> Option<Anr> {
564        let map = self.store.read().await;
565        map.values().find(|anr| anr.ip4 == ip4).cloned()
566    }
567
568    /// Get all anrs
569    pub async fn get_all_b3f4(&self) -> Vec<B3f4> {
570        let map = self.store.read().await;
571        let anrs: Vec<B3f4> = map.values().cloned().map(|a| a.pk_b3_f4).collect();
572        anrs
573    }
574
575    /// Get all anrs
576    pub async fn get_all(&self) -> Vec<Anr> {
577        let map = self.store.read().await;
578        let anrs: Vec<Anr> = map.values().cloned().collect();
579        anrs
580    }
581
582    /// Increment and return the frames counter
583    pub async fn is_within_udp_limit(&self, ip4: Ipv4Addr) -> Option<bool> {
584        let mut map = self.store.write().await;
585        if let Some(anr) = map.values_mut().find(|anr| anr.ip4 == ip4) {
586            anr.udp_packets += 1;
587            return Some(anr.udp_packets < UDP_PACKETS_LIMIT);
588        }
589        None
590    }
591
592    /// Check if protocol message is within rate limit for this peer
593    pub async fn is_within_proto_limit(&self, pk: &[u8], typename: &str) -> Option<bool> {
594        let mut map = self.store.write().await;
595        if let Some(anr) = map.get_mut(pk)
596            && let Some(limit) = PROTO_RATE_LIMITS.get(typename)
597        {
598            let counter = anr.proto_reqs.entry(typename.to_string()).or_insert(0);
599            *counter += 1;
600            return Some(*counter < *limit);
601        }
602        None
603    }
604
605    /// Reset rate limiting counters for all anrs
606    pub async fn update_rate_limiting_counters(&self) {
607        let mut map = self.store.write().await;
608        for anr in map.values_mut() {
609            anr.udp_packets = anr.udp_packets.saturating_sub(UDP_PACKETS_LIMIT / 2);
610            // decrement all proto counters by half their limits
611            for (typename, limit) in PROTO_RATE_LIMITS.iter() {
612                if let Some(counter) = anr.proto_reqs.get_mut(*typename) {
613                    *counter = counter.saturating_sub(*limit / 2);
614                }
615            }
616        }
617    }
618
619    /// Reset handshaked status (will silently return if pk not found)
620    pub async fn unset_handshaked(&self, pk: &[u8]) {
621        let mut map = self.store.write().await;
622        if let Some(anr) = map.get_mut(pk) {
623            anr.handshaked = false;
624        }
625    }
626
627    /// Set handshaked status (will silently return if pk not found)
628    pub async fn set_handshaked(&self, pk: &[u8]) {
629        let mut map = self.store.write().await;
630        if let Some(anr) = map.get_mut(pk) {
631            anr.handshaked = true;
632        }
633    }
634
635    /// Get all handshaked node public keys
636    pub async fn handshaked(&self) -> Vec<PublicKey> {
637        let map = self.store.read().await;
638        let mut pks = Vec::new();
639        for (k, v) in map.iter() {
640            if v.handshaked {
641                pks.push(k.clone());
642            }
643        }
644        pks
645    }
646
647    /// Get all not handshaked (pk, ip4) pairs
648    pub async fn get_all_not_handshaked_ip4(&self) -> Vec<Ipv4Addr> {
649        let map = self.store.read().await;
650        let mut results = Vec::new();
651
652        for (_, v) in map.iter() {
653            if !v.handshaked {
654                results.push(v.ip4);
655            }
656        }
657
658        results
659    }
660
661    /// Check if node is handshaked
662    pub async fn is_handshaked(&self, pk: &[u8]) -> bool {
663        let map = self.store.read().await;
664        map.get(pk).map(|v| v.handshaked).unwrap_or(false)
665    }
666
667    /// Check if node is handshaked with valid ip4
668    pub async fn handshaked_and_valid_ip4(&self, pk: &[u8], ip4: &Ipv4Addr) -> bool {
669        let map = self.store.read().await;
670        map.get(pk).map(|v| v.handshaked && v.ip4 == *ip4).unwrap_or(false)
671    }
672
673    /// Get random verified nodes
674    pub async fn get_random_verified(&self, count: usize) -> Vec<Ipv4Addr> {
675        use rand::seq::IndexedRandom;
676        use std::collections::HashSet;
677
678        // deduplicate by ip4
679        let mut seen_ips = HashSet::new();
680        let mut unique_pairs = Vec::new();
681        for ip4 in self.get_all_handshaked_ip4().await {
682            if seen_ips.insert(ip4) {
683                unique_pairs.push(ip4);
684            }
685        }
686
687        let mut rng = rand::rng();
688        let selected: Vec<_> = unique_pairs.choose_multiple(&mut rng, count).cloned().collect();
689
690        selected
691    }
692
693    /// Get random unverified nodes
694    pub async fn get_random_not_verified(&self, count: usize) -> Vec<Ipv4Addr> {
695        use rand::seq::IndexedRandom;
696        use std::collections::HashSet;
697
698        // deduplicate by ip4
699        let mut seen_ips = HashSet::new();
700        let mut unique_pairs = Vec::new();
701        for ip4 in self.get_all_not_handshaked_ip4().await {
702            if seen_ips.insert(ip4) {
703                unique_pairs.push(ip4);
704            }
705        }
706
707        let mut rng = rand::rng();
708        let selected: Vec<_> = unique_pairs.choose_multiple(&mut rng, count).cloned().collect();
709
710        selected
711    }
712
713    /// Get all handshaked (pk, ip4) pairs
714    pub async fn get_all_excluding_b3f4(&self, b3f4: &[B3f4]) -> Vec<Anr> {
715        let map = self.store.read().await;
716        let mut results = Vec::new();
717        for (_, v) in map.iter() {
718            if !b3f4.contains(&v.pk_b3_f4) {
719                results.push(v.clone());
720            }
721        }
722        results
723    }
724
725    /// Get all handshaked (pk, ip4) pairs
726    pub async fn get_all_handshaked_ip4(&self) -> Vec<Ipv4Addr> {
727        let map = self.store.read().await;
728        let mut results = Vec::new();
729
730        for (_, v) in map.iter() {
731            if v.handshaked {
732                results.push(v.ip4);
733            }
734        }
735
736        results
737    }
738
739    /// Get ip addresses for given public keys
740    pub async fn by_pks_ip<T: AsRef<[u8]>>(&self, pks: &[T]) -> Vec<Ipv4Addr> {
741        // build a set of owned pk bytes for efficient lookup
742        let pk_set: std::collections::HashSet<PublicKey> = pks
743            .iter()
744            .filter_map(|p| {
745                let bytes = p.as_ref();
746                if bytes.len() == 48 {
747                    let mut array = [0u8; 48];
748                    array.copy_from_slice(bytes);
749                    Some(PublicKey::from(array))
750                } else {
751                    None
752                }
753            })
754            .collect();
755        let mut ips = Vec::new();
756
757        let map = self.store.read().await;
758        for v in map.values() {
759            if pk_set.contains(&v.pk) {
760                ips.push(v.ip4);
761            }
762        }
763
764        ips
765    }
766
767    /// Seed initial anrs (called on startup)
768    pub async fn seed(&self, config: &Config) {
769        for anr in config.seed_anrs.iter().cloned().map(Into::<Anr>::into) {
770            self.insert(anr).await;
771        }
772
773        if let Ok(my_anr) = Anr::from_config(config) {
774            self.insert(my_anr).await;
775            self.set_handshaked(config.get_pk().as_ref()).await;
776        }
777    }
778}
779
780#[cfg(test)]
781mod tests {
782    use super::*;
783    use amadeus_utils::bls12_381;
784
785    impl NodeAnrs {
786        pub async fn clear_all(&self) {
787            self.store.write().await.clear()
788        }
789
790        pub async fn count(&self) -> usize {
791            self.store.read().await.len()
792        }
793    }
794
795    #[tokio::test]
796    async fn test_anr_operations() {
797        let registry = NodeAnrs::new();
798
799        // create test keys with unique pk to avoid conflicts
800        let _sk = [1; 32];
801        let mut pk = [2; 48];
802        let pid_bytes = std::process::id().to_le_bytes();
803        let now_ns_bytes = crate::utils::misc::get_unix_nanos_now().to_le_bytes();
804        pk[..4].copy_from_slice(&pid_bytes);
805        pk[4..12].copy_from_slice(&now_ns_bytes[..8]);
806
807        let pop = vec![3; 96];
808        let ip4 = Ipv4Addr::new(127, 0, 0, 1);
809        let version = Ver::new(1, 0, 0);
810
811        // manually create ANR without signature verification for testing
812        let pk_b3 = blake3::hash(&pk);
813        let pk_b3_f4 = B3f4::new(&pk_b3);
814
815        let pk_wrapped = PublicKey::new(pk);
816        let anr = Anr {
817            ip4,
818            pk: pk_wrapped,
819            pop,
820            port: 36969,
821            signature: vec![0; 96],
822            ts: 1234567890,
823            version,
824            anr_name: None,
825            anr_desc: None,
826            handshaked: false,
827            hasChainPop: false,
828            error: None,
829            error_tries: 0,
830            next_check: 1234567893,
831            pk_b3: Hash::new(pk_b3.into()),
832            pk_b3_f4,
833            proto_reqs: HashMap::new(),
834            udp_packets: 0,
835        };
836
837        // test insert
838        registry.insert(anr.clone()).await;
839
840        // test get
841        let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
842        assert_eq!(&*retrieved.pk, &pk);
843        assert!(!retrieved.handshaked, "Expected handshaked to be false after insert, got true");
844
845        // test set_handshaked
846        registry.set_handshaked(pk_wrapped.as_ref()).await;
847        let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
848        assert!(retrieved.handshaked, "Expected handshaked to be true after set_handshaked");
849
850        // test handshaked query
851        let handshaked_pks = registry.handshaked().await;
852        assert!(handshaked_pks.iter().any(|p| &**p == &pk), "pk should be in handshaked list");
853
854        // test is_handshaked
855        assert!(registry.is_handshaked(pk_wrapped.as_ref()).await, "is_handshaked should return true");
856
857        // test get_all
858        let all = registry.get_all().await;
859        assert!(!all.is_empty());
860        assert!(all.iter().any(|a| &*a.pk == &pk));
861
862        // test count functions
863        let total_count = registry.count().await;
864        assert!(total_count >= 1, "Expected at least 1 ANR, got {}", total_count);
865
866        // cleanup
867        registry.clear_all().await;
868
869        // verify our pk was removed
870        assert!(registry.get(pk_wrapped.as_ref()).await.is_none(), "Our pk should be removed");
871    }
872
873    #[tokio::test]
874    async fn test_anr_update() {
875        let registry = NodeAnrs::new();
876
877        // create unique pk for this test
878        let mut pk = [1; 48];
879        let pid_bytes = std::process::id().to_le_bytes();
880        let now_ns_bytes = crate::utils::misc::get_unix_nanos_now().to_le_bytes();
881        pk[..4].copy_from_slice(&pid_bytes);
882        pk[4..12].copy_from_slice(&now_ns_bytes[..8]);
883        let pop = vec![2; 96];
884        let ip4 = Ipv4Addr::new(192, 168, 1, 1);
885        let version = Ver::new(1, 0, 0);
886
887        // compute Blake3 fields for testing
888        let pk_b3 = blake3::hash(&pk);
889        let pk_b3_f4 = B3f4::new(&pk_b3);
890
891        let pk_wrapped = PublicKey::new(pk);
892        // insert initial anr
893        let anr1 = Anr {
894            ip4,
895            pk: pk_wrapped,
896            pop: pop.clone(),
897            port: 36969,
898            signature: vec![0; 96],
899            ts: 1000,
900            version: version.clone(),
901            anr_name: None,
902            anr_desc: None,
903            handshaked: true,
904            hasChainPop: false,
905            error: None,
906            error_tries: 0,
907            next_check: 1003,
908            pk_b3: Hash::new(pk_b3.into()),
909            pk_b3_f4,
910            proto_reqs: HashMap::new(),
911            udp_packets: 0,
912        };
913        registry.insert(anr1).await;
914        registry.set_handshaked(pk_wrapped.as_ref()).await;
915
916        // try to insert older anr (should not update)
917        let anr2 = Anr {
918            ip4: Ipv4Addr::new(10, 0, 0, 1),
919            pk: pk_wrapped,
920            pop: pop.clone(),
921            port: 36969,
922            signature: vec![0; 96],
923            ts: 999,
924            version: version.clone(),
925            anr_name: None,
926            anr_desc: None,
927            handshaked: false,
928            hasChainPop: false,
929            error: None,
930            error_tries: 0,
931            next_check: 1002,
932            pk_b3: Hash::new(pk_b3.into()),
933            pk_b3_f4,
934            proto_reqs: HashMap::new(),
935            udp_packets: 0,
936        };
937        registry.insert(anr2).await;
938
939        // verify old anr was not updated
940        let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
941        assert_eq!(retrieved.ip4, Ipv4Addr::new(192, 168, 1, 1));
942        assert_eq!(retrieved.ts, 1000);
943        assert!(retrieved.handshaked);
944
945        // insert newer anr with same ip (should preserve handshake)
946        let anr3 = Anr {
947            ip4,
948            pk: pk_wrapped,
949            pop: pop.clone(),
950            port: 36969,
951            signature: vec![0; 96],
952            ts: 2000,
953            version: Ver::new(2, 0, 0),
954            anr_name: None,
955            anr_desc: None,
956            handshaked: false,
957            hasChainPop: false,
958            error: None,
959            error_tries: 0,
960            next_check: 2003,
961            pk_b3: Hash::new(pk_b3.into()),
962            pk_b3_f4,
963            proto_reqs: HashMap::new(),
964            udp_packets: 0,
965        };
966        registry.insert(anr3).await;
967
968        let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
969        assert_eq!(retrieved.ts, 2000);
970        assert_eq!(retrieved.version, Ver::new(2, 0, 0));
971        assert!(retrieved.handshaked); // should be preserved
972
973        // insert newer anr with different ip (should reset handshake)
974        let anr4 = Anr {
975            ip4: Ipv4Addr::new(10, 0, 0, 1),
976            pk: pk_wrapped,
977            pop,
978            port: 36969,
979            signature: vec![0; 96],
980            ts: 3000,
981            version: Ver::new(3, 0, 0),
982            anr_name: None,
983            anr_desc: None,
984            handshaked: true,
985            hasChainPop: false,
986            error: Some("old error".to_string()),
987            error_tries: 5,
988            next_check: 3003,
989            pk_b3: Hash::new(pk_b3.into()),
990            pk_b3_f4,
991            proto_reqs: HashMap::new(),
992            udp_packets: 0,
993        };
994        registry.insert(anr4).await;
995
996        let retrieved = registry.get(pk_wrapped.as_ref()).await.unwrap();
997        assert_eq!(retrieved.ts, 3000);
998        assert_eq!(retrieved.ip4, Ipv4Addr::new(10, 0, 0, 1));
999        assert!(!retrieved.handshaked); // should be reset
1000        assert_eq!(retrieved.error, None); // should be reset
1001        assert_eq!(retrieved.error_tries, 0); // should be reset
1002
1003        // cleanup
1004        registry.clear_all().await;
1005    }
1006
1007    #[tokio::test]
1008    async fn test_get_random_not_handshaked_multiple() {
1009        let registry = NodeAnrs::new();
1010
1011        // Create 5 unique not-handshaked ANRs
1012        for i in 1..=5 {
1013            let mut pk = [i as u8; 48];
1014            let now_ns_bytes = crate::utils::misc::get_unix_nanos_now().to_le_bytes();
1015            pk[40..48].copy_from_slice(&now_ns_bytes[..8]);
1016
1017            // compute Blake3 fields for this pk
1018            let pk_b3 = blake3::hash(&pk);
1019            let pk_b3_f4 = B3f4::new(&pk_b3);
1020
1021            let anr = Anr {
1022                ip4: Ipv4Addr::new(192, 168, 1, i), // different IPs
1023                pk: PublicKey::new(pk),
1024                pop: vec![i as u8; 96],
1025                port: 36969,
1026                signature: vec![i as u8; 96],
1027                ts: 1000 + i as u32,
1028                version: Ver::new(1, 0, i as u8),
1029                anr_name: None,
1030                anr_desc: None,
1031                handshaked: false, // Explicitly not handshaked
1032                hasChainPop: false,
1033                error: None,
1034                error_tries: 0,
1035                next_check: 2000,
1036                pk_b3: Hash::new(pk_b3.into()),
1037                pk_b3_f4,
1038                proto_reqs: HashMap::new(),
1039                udp_packets: 0,
1040            };
1041
1042            registry.insert(anr).await;
1043        }
1044
1045        // Test multiple calls to ensure randomness and correct count
1046        for run in 1..=10 {
1047            let result = registry.get_random_not_verified(3).await;
1048
1049            // Should return 3 results since we have 5 candidates
1050            assert_eq!(result.len(), 3, "Run {}: expected 3 results, got {}", run, result.len());
1051
1052            // All should have different IPs (uniqueness check)
1053            let mut ips = std::collections::HashSet::new();
1054            for ip in &result {
1055                assert!(ips.insert(*ip), "Run {}: duplicate IP found: {}", run, ip);
1056            }
1057        }
1058
1059        // Test asking for more than available
1060        let result = registry.get_random_not_verified(10).await;
1061        assert_eq!(result.len(), 5, "Should return all 5 when asking for 10");
1062
1063        // cleanup
1064        registry.clear_all().await;
1065    }
1066
1067    #[test]
1068    fn test_anr_vecpak_signature() {
1069        // Generate test keys
1070        let sk = bls12_381::generate_sk();
1071        let pk = bls12_381::get_public_key(&sk).expect("get pk");
1072        let pop = bls12_381::sign(&sk, pk.as_ref(), crate::consensus::DST_POP).expect("sign pop");
1073
1074        // Build ANR with vecpak signature
1075        let anr =
1076            Anr::build(&sk, &pk, pop.as_ref(), Ipv4Addr::new(127, 0, 0, 1), Ver::new(1, 2, 5)).expect("build anr");
1077
1078        // Verify the signature works
1079        assert!(anr.verify_signature(), "ANR signature should verify");
1080
1081        // Print the vecpak encoded data for debugging
1082        let vecpak_data = anr.to_vecpak_for_signing();
1083        println!("Vecpak encoded ANR (hex): {}", hex::encode(&vecpak_data));
1084        println!("Vecpak size: {} bytes", vecpak_data.len());
1085
1086        // Ensure it's using vecpak format, not ETF
1087        // Vecpak starts with tags 0-7, ETF starts with 131
1088        assert!(vecpak_data[0] <= 7, "Should be vecpak format, not ETF");
1089    }
1090
1091    #[test]
1092    fn test_anr_signature_compatibility() {
1093        // This test verifies that our ANR signatures match Elixir's format
1094        // by checking the structure of the signed data
1095
1096        let sk = bls12_381::generate_sk();
1097        let pk = bls12_381::get_public_key(&sk).expect("get pk");
1098        let pop = bls12_381::sign(&sk, pk.as_ref(), crate::consensus::DST_POP).expect("sign pop");
1099
1100        let anr =
1101            Anr::build(&sk, &pk, pop.as_ref(), Ipv4Addr::new(192, 168, 1, 1), Ver::new(1, 2, 3)).expect("build anr");
1102
1103        // Get the data we're signing
1104        let to_sign = anr.to_vecpak_for_signing();
1105
1106        // Decode it to see the structure
1107        let decoded = vecpak::decode(&to_sign).expect("decode vecpak");
1108
1109        println!("Decoded vecpak term: {:?}", decoded);
1110
1111        // Verify it's a PropList (map in vecpak)
1112        match decoded {
1113            vecpak::Term::PropList(pairs) => {
1114                println!("ANR has {} fields", pairs.len());
1115                for (key, _value) in &pairs {
1116                    if let vecpak::Term::Binary(key_bytes) = key {
1117                        let key_str = String::from_utf8_lossy(key_bytes);
1118                        println!("Field: {}", key_str);
1119                    }
1120                }
1121            }
1122            _ => panic!("Expected PropList for ANR"),
1123        }
1124    }
1125}