Skip to main content

amadeus_node/node/
peers.rs

1use crate::config::Config;
2use crate::consensus::doms::EntrySummary;
3use crate::node::anr;
4use crate::node::protocol::{EventTip, PingReply, Typename};
5use crate::utils::misc::get_unix_millis_now;
6use crate::utils::{Hash, PublicKey};
7use crate::{Context, Ver};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use std::hash::Hash as StdHash;
11use std::net::Ipv4Addr;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use tokio::task::spawn_blocking;
15use tracing::{info, warn};
16
17/// Represents the different stages of the handshake process
18#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "snake_case")]
20pub enum HandshakeStatus {
21    /// No handshake initiated
22    None,
23    /// Sent new_phone_who_dis message, waiting for what response
24    Initiated,
25    /// Sent/received what message response (handshake done for us)
26    Completed,
27}
28
29#[derive(Debug)]
30struct ConcurrentMap<K, V> {
31    inner: RwLock<HashMap<K, V>>,
32}
33
34impl<K: Eq + StdHash + Clone, V: Clone> ConcurrentMap<K, V> {
35    fn new() -> Self {
36        Self { inner: RwLock::new(HashMap::new()) }
37    }
38    async fn len(&self) -> usize {
39        self.inner.read().await.len()
40    }
41    async fn insert(&self, key: K, value: V) -> Result<(), ()> {
42        let mut map = self.inner.write().await;
43        if map.contains_key(&key) {
44            Err(())
45        } else {
46            map.insert(key, value);
47            Ok(())
48        }
49    }
50    async fn remove(&self, key: &K) -> Option<V> {
51        self.inner.write().await.remove(key)
52    }
53    async fn read<R>(&self, key: &K, f: impl FnOnce(&K, &V) -> R) -> Option<R> {
54        let v = {
55            let map = self.inner.read().await;
56            map.get(key).cloned()
57        };
58        v.as_ref().map(|vv| f(key, vv))
59    }
60    async fn scan(&self, mut f: impl FnMut(&K, &V)) {
61        let snapshot: Vec<(K, V)> = {
62            let map = self.inner.read().await;
63            map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
64        };
65        for (k, v) in snapshot.iter() {
66            f(k, v);
67        }
68    }
69    async fn update<R>(&self, key: &K, mut f: impl FnMut(&K, &mut V) -> R) -> Option<R> {
70        let mut map = self.inner.write().await;
71        if let Some(v) = map.get_mut(key) { Some(f(key, v)) } else { None }
72    }
73}
74
75#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
76pub enum Error {
77    #[error(transparent)]
78    Anr(#[from] anr::Error),
79}
80
81impl crate::utils::misc::Typename for Error {
82    fn typename(&self) -> &'static str {
83        self.into()
84    }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct Peer {
89    pub ip: Ipv4Addr,
90    pub pk: Option<PublicKey>,
91    pub version: Option<Ver>,
92    pub latency: Option<u64>,
93    pub last_msg: u64,
94    pub last_ping: Option<u64>,
95    pub last_pong: Option<u64>,
96    pub shared_secret: Option<Vec<u8>>,
97    pub temporal: Option<TipInfo>,
98    pub rooted: Option<TipInfo>,
99    pub last_seen_ms: u64,
100    pub last_msg_type: Option<String>,
101    pub handshake_status: HandshakeStatus,
102}
103
104impl Peer {
105    /// Check if the handshake is completed (either we sent what or received what)
106    pub fn is_handshaked(&self) -> bool {
107        self.handshake_status == HandshakeStatus::Completed
108    }
109
110    /// Check if a peer is online
111    pub fn is_online(&self) -> bool {
112        // Peer is online if last ping was within 6 seconds (6000 ms)
113        // Use saturating_sub to prevent overflow if last_ping > ts_m
114        get_unix_millis_now().saturating_sub(self.last_ping.unwrap_or_default()) <= 6_000
115    }
116}
117
118#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
119pub struct TipInfo {
120    pub height: u64,
121    pub prev_hash: Hash,
122}
123
124impl From<EntrySummary> for TipInfo {
125    fn from(summary: EntrySummary) -> Self {
126        Self { height: summary.header.height, prev_hash: summary.header.prev_hash }
127    }
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct HeaderInfo {
132    pub height: u64,
133    pub prev_hash: Hash,
134}
135
136/// NodePeers structure managing the peer database
137#[derive(Debug, Clone)]
138pub struct NodePeers {
139    peers: Arc<ConcurrentMap<Ipv4Addr, Peer>>,
140    max_peers: usize,
141}
142
143impl NodePeers {
144    /// Create a new NodePeers instance
145    pub fn new(max_peers: usize) -> Self {
146        Self { peers: Arc::new(ConcurrentMap::new()), max_peers }
147    }
148
149    /// Create with default max_peers of 100
150    pub fn default() -> Self {
151        Self::new(100)
152    }
153
154    pub async fn clear_stale(&self, fabric: &crate::consensus::fabric::Fabric, node_registry: &anr::NodeAnrs) -> usize {
155        self.clear_stale_inner(fabric, node_registry)
156            .await
157            .inspect_err(|e| warn!("peer cleanup error: {}", e))
158            .unwrap_or(0)
159    }
160
161    /// Clear stale peers and add missing validators/handshaked nodes
162    pub async fn clear_stale_inner(
163        &self,
164        fabric: &crate::consensus::fabric::Fabric,
165        node_registry: &anr::NodeAnrs,
166    ) -> Result<usize, Error> {
167        let ts_m = get_unix_millis_now();
168        let height = fabric.get_temporal_height_or_0();
169        let validators = fabric.trainers_for_height(height + 1).unwrap_or_default();
170        let validators_vec: Vec<Vec<u8>> = validators.iter().map(|pk| pk.to_vec()).collect();
171
172        let validator_anr_ips = node_registry.by_pks_ip(&validators_vec).await;
173        let validators_map: std::collections::HashSet<&PublicKey> = validators.iter().collect();
174
175        let handshaked_ips = node_registry.get_all_handshaked_ip4().await;
176
177        let mut cur_ips = Vec::new();
178        let mut cur_val_ips = Vec::new();
179
180        // Clean stale peers and collect current IPs
181        let mut to_remove = Vec::new();
182        self.peers
183            .scan(|ip, peer| {
184                // Remove peers that haven't sent messages in 60 seconds (60*1000 ms)
185                if ts_m > (peer.last_msg + 60_000) {
186                    to_remove.push(*ip);
187                    return;
188                }
189
190                if let Some(ref pk) = peer.pk {
191                    if validators_map.contains(pk) {
192                        cur_val_ips.push(*ip);
193                    } else {
194                        cur_ips.push(*ip);
195                    }
196                } else {
197                    cur_ips.push(*ip);
198                }
199            })
200            .await;
201
202        // Remove stale peers after scanning
203        let cleared_count = to_remove.len();
204        for ip in to_remove {
205            let _ = self.peers.remove(&ip).await;
206        }
207
208        // Find missing validators and handshaked peers
209        let missing_vals: Vec<_> = validator_anr_ips.iter().filter(|ip| !cur_val_ips.contains(ip)).cloned().collect();
210
211        let missing_ips: Vec<_> = handshaked_ips.into_iter().filter(|ip| !cur_ips.contains(ip)).collect();
212
213        // Get max_peers config
214        let add_size = self
215            .max_peers
216            .saturating_sub(self.size().await)
217            .saturating_sub(cur_val_ips.len())
218            .saturating_sub(missing_vals.len());
219
220        let missing_ips = spawn_blocking(move || {
221            // Shuffle and take limited missing IPs
222            let mut missing_ips = missing_ips;
223            use rand::seq::SliceRandom;
224            let mut rng = rand::rng();
225            missing_ips.shuffle(&mut rng);
226            missing_ips.truncate(add_size);
227            missing_ips
228        })
229        .await
230        .unwrap_or_default();
231
232        // Add missing validators and peers with proper handshake status from ANR
233        for ip in missing_vals.iter().chain(missing_ips.iter()) {
234            // Find the ANR for this IP to get handshake status
235            let mut handshake_status = HandshakeStatus::None;
236            let anrs = node_registry.get_all().await;
237
238            for anr in anrs {
239                if anr.ip4 == *ip {
240                    handshake_status = if anr.handshaked { HandshakeStatus::Completed } else { HandshakeStatus::None };
241                    break;
242                }
243            }
244
245            let _ = self
246                .insert_new_peer(Peer {
247                    ip: *ip,
248                    pk: None,
249                    version: None,
250                    latency: None,
251                    last_msg: ts_m,
252                    last_ping: None,
253                    last_pong: None,
254                    shared_secret: None,
255                    temporal: None,
256                    rooted: None,
257                    last_seen_ms: ts_m,
258                    last_msg_type: None,
259                    handshake_status,
260                })
261                .await;
262        }
263
264        Ok(cleared_count)
265    }
266
267    /// Insert a new peer if it doesn't already exist
268    pub async fn insert_new_peer(&self, mut peer: Peer) -> bool {
269        if peer.last_msg == 0 {
270            peer.last_msg = get_unix_millis_now();
271        }
272
273        self.peers.insert(peer.ip, peer).await.is_ok()
274    }
275
276    /// Seed initial peers with validators
277    pub async fn seed(
278        &self,
279        fabric: &crate::consensus::fabric::Fabric,
280        config: &Config,
281        node_anrs: &anr::NodeAnrs,
282    ) -> Result<(), Error> {
283        let height = fabric.get_temporal_height_or_0();
284        let validators = fabric.trainers_for_height(height + 1).unwrap_or_default();
285        let validators: Vec<Vec<u8>> = validators.iter().map(|pk| pk.to_vec()).collect();
286
287        let validator_ips: Vec<_> =
288            node_anrs.by_pks_ip(&validators).await.into_iter().filter(|ip| *ip != config.get_public_ipv4()).collect();
289
290        let ts_m = get_unix_millis_now();
291        for ip in validator_ips {
292            let _ = self.insert_new_peer(Peer {
293                ip,
294                pk: None,
295                version: None,
296                latency: None,
297                last_msg: ts_m,
298                last_ping: None,
299                last_pong: None,
300                shared_secret: None,
301                temporal: None,
302                rooted: None,
303                last_seen_ms: ts_m,
304                last_msg_type: None,
305                handshake_status: HandshakeStatus::None,
306            });
307        }
308
309        Ok(())
310    }
311
312    /// Get number of peers
313    pub async fn size(&self) -> usize {
314        self.peers.len().await
315    }
316
317    /// Get random online peers
318    pub async fn random(&self, no: usize) -> Result<Vec<Peer>, Error> {
319        let online_peers = self.get_online().await?;
320        if online_peers.is_empty() {
321            return Ok(vec![]);
322        }
323
324        use rand::seq::SliceRandom;
325        let mut rng = rand::rng();
326        let mut peers = online_peers;
327        peers.shuffle(&mut rng);
328        peers.truncate(no);
329
330        Ok(peers)
331    }
332
333    /// Get all peers
334    pub async fn get_all(&self) -> Result<Vec<Peer>, Error> {
335        let mut peers = Vec::new();
336        self.peers
337            .scan(|_, peer| {
338                peers.push(peer.clone());
339            })
340            .await;
341        Ok(peers)
342    }
343
344    /// Get all online peers
345    pub async fn get_online(&self) -> Result<Vec<Peer>, Error> {
346        let mut online_peers = Vec::new();
347
348        self.peers
349            .scan(|_, peer| {
350                if peer.is_online() {
351                    online_peers.push(peer.clone());
352                }
353            })
354            .await;
355
356        Ok(online_peers)
357    }
358
359    /// Get summary of online peers
360    pub async fn get_online_ip_l_th_rh(&self) -> Result<Vec<(Ipv4Addr, Option<u64>, Option<u64>, Option<u64>)>, Error> {
361        let online_peers = self.get_online().await?;
362        let mut summary = Vec::new();
363
364        for peer in online_peers {
365            let temporal_height = peer.temporal.as_ref().map(|t| t.height);
366            let rooted_height = peer.rooted.as_ref().map(|r| r.height);
367
368            summary.push((peer.ip, peer.latency, temporal_height, rooted_height));
369        }
370
371        Ok(summary)
372    }
373
374    /// Get shared secret for a peer by public key
375    pub async fn get_shared_secret(&self, pk: &[u8]) -> Result<Vec<u8>, Error> {
376        if pk.is_empty() {
377            return Ok(vec![]);
378        }
379
380        let mut found_secret = None;
381        self.peers
382            .scan(|_, peer| {
383                if let Some(ref peer_pk) = peer.pk {
384                    if peer_pk == pk {
385                        found_secret = peer.shared_secret.clone();
386                    }
387                }
388            })
389            .await;
390
391        // Return found secret or empty vector
392        Ok(found_secret.unwrap_or_default())
393    }
394
395    /// Get peer by IP address
396    pub async fn by_ip(&self, ip: Ipv4Addr) -> Option<Peer> {
397        self.peers.read(&ip, |_, peer| peer.clone()).await
398    }
399
400    /// Get peers by multiple public keys
401    pub async fn by_pks(&self, pks: &[Vec<u8>]) -> Result<Vec<Peer>, Error> {
402        let mut peers = Vec::new();
403
404        self.peers
405            .scan(|_, peer| {
406                if let Some(ref peer_pk) = peer.pk {
407                    // Compare peer_pk (which is PublicKey) with pks (which are Vec<u8>)
408                    if pks.iter().any(|pk| pk.as_slice() == peer_pk.as_slice()) {
409                        peers.push(peer.clone());
410                    }
411                }
412            })
413            .await;
414
415        Ok(peers)
416    }
417
418    /// Get peers for a specific height (trainers)
419    pub async fn for_height(&self, fabric: &crate::consensus::fabric::Fabric, height: u64) -> Result<Vec<Peer>, Error> {
420        let trainers = fabric.trainers_for_height(height).unwrap_or_default();
421        let trainers_set: std::collections::HashSet<&PublicKey> = trainers.iter().collect();
422        let mut peers = Vec::new();
423
424        self.peers
425            .scan(|_, peer| {
426                if let Some(ref pk) = peer.pk {
427                    if trainers_set.contains(pk) {
428                        peers.push(peer.clone());
429                    }
430                }
431            })
432            .await;
433
434        Ok(peers)
435    }
436
437    /// Get peer IPs by who specification
438    pub async fn by_who(&self, fabric: &crate::consensus::fabric::Fabric, who: Who) -> Result<Vec<Ipv4Addr>, Error> {
439        match who {
440            Who::Some(peer_ips) => Ok(peer_ips),
441            Who::Trainers => {
442                let height = fabric.get_temporal_height_or_0();
443                let trainer_peers = self.for_height(fabric, height + 1).await?;
444                let mut ips: Vec<_> = trainer_peers.iter().map(|p| p.ip).collect();
445
446                if ips.is_empty() {
447                    return Ok(vec![]);
448                }
449
450                use rand::seq::SliceRandom;
451                let mut rng = rand::rng();
452                ips.shuffle(&mut rng);
453                Ok(ips)
454            }
455            Who::NotTrainers(cnt) => {
456                let height = fabric.get_temporal_height_or_0();
457                let trainer_peers = self.for_height(fabric, height + 1).await?;
458                let trainer_ips: std::collections::HashSet<_> = trainer_peers.iter().map(|p| p.ip).collect();
459
460                let all_peers = self.get_all().await?;
461                let not_trainer_ips: Vec<_> =
462                    all_peers.iter().map(|p| p.ip).filter(|ip| !trainer_ips.contains(ip)).collect();
463
464                if not_trainer_ips.is_empty() {
465                    return Ok(vec![]);
466                }
467
468                use rand::seq::SliceRandom;
469                let mut rng = rand::rng();
470                let mut ips = not_trainer_ips;
471                ips.shuffle(&mut rng);
472                ips.truncate(cnt);
473                Ok(ips)
474            }
475            Who::Random(no) => {
476                let random_peers = self.random(no).await?;
477                Ok(random_peers.iter().map(|p| p.ip).collect())
478            }
479        }
480    }
481
482    // fn apply_latency_filters(
483    //     mut filtered: Vec<(Ipv4Addr, Option<u64>, Option<u32>, Option<u32>)>,
484    //     filter: HeightFilter,
485    // ) -> Result<Vec<(Ipv4Addr, Option<u64>, Option<u32>, Option<u32>)>, Error> {
486    //     let take = filter.take.unwrap_or(3);
487    //
488    //     // Apply latency2 filter first
489    //     if let Some(latency2) = filter.latency2 {
490    //         let new_filtered: Vec<_> =
491    //             filtered.iter().filter(|(_, lat, _, _)| lat.unwrap_or(u64::MAX) <= latency2).cloned().collect();
492    //
493    //         if new_filtered.len() >= take {
494    //             let mut new_filter = filter;
495    //             new_filter.latency2 = None;
496    //             return Self::apply_latency_filters(new_filtered, new_filter);
497    //         }
498    //         // Continue with current filtered list
499    //     }
500    //
501    //     // Apply latency1 filter
502    //     if let Some(latency1) = filter.latency1 {
503    //         let new_filtered: Vec<_> =
504    //             filtered.iter().filter(|(_, lat, _, _)| lat.unwrap_or(u64::MAX) <= latency1).cloned().collect();
505    //
506    //         if new_filtered.len() >= take {
507    //             let mut new_filter = filter;
508    //             new_filter.latency1 = None;
509    //             return Self::apply_latency_filters(new_filtered, new_filter);
510    //         }
511    //         // Continue with current filtered list
512    //     }
513    //
514    //     // Apply main latency filter
515    //     if let Some(latency) = filter.latency {
516    //         let new_filtered: Vec<_> =
517    //             filtered.iter().filter(|(_, lat, _, _)| lat.unwrap_or(u64::MAX) <= latency).cloned().collect();
518    //
519    //         if new_filtered.len() >= take {
520    //             filtered = new_filtered;
521    //         }
522    //         // Continue with filtered list (either new or original)
523    //     }
524    //
525    //     // Truncate to requested size
526    //     filtered.truncate(take);
527    //     Ok(filtered)
528    // }
529
530    pub async fn update_peer_ping_timestamp(&self, ip: Ipv4Addr, ts_m: u64) {
531        // Update using ConcurrentMap's update method
532        self.peers
533            .update(&ip, |_key, peer| {
534                peer.last_ping = Some(ts_m);
535            })
536            .await;
537    }
538
539    pub async fn update_peer_from_tip(&self, _ctx: &Context, ip: Ipv4Addr, tip: &EventTip) {
540        let current_time_ms = get_unix_millis_now();
541        let temporal: TipInfo = tip.temporal.clone().into();
542        let rooted: TipInfo = tip.rooted.clone().into();
543
544        let updated = self
545            .peers
546            .update(&ip, |_key, peer| {
547                peer.last_seen_ms = current_time_ms;
548                peer.last_msg = current_time_ms;
549                peer.last_msg_type = Some(tip.typename().to_string());
550                peer.temporal = Some(temporal.clone());
551                peer.rooted = Some(rooted.clone());
552            })
553            .await
554            .is_some();
555
556        if !updated {
557            let new_peer = Peer {
558                ip,
559                pk: None, // Will be set during handshake
560                version: None,
561                latency: None,
562                last_msg: current_time_ms,
563                last_seen_ms: current_time_ms,
564                last_ping: None,
565                last_pong: None,
566                shared_secret: None,
567                temporal: Some(temporal),
568                rooted: Some(rooted),
569                last_msg_type: Some(tip.typename().to_string()),
570                handshake_status: HandshakeStatus::None,
571            };
572            self.insert_new_peer(new_peer).await;
573        }
574    }
575
576    pub async fn update_peer_from_pong(&self, ip: Ipv4Addr, pong: &PingReply) {
577        let current_time_ms = get_unix_millis_now();
578        let latency = current_time_ms.saturating_sub(pong.ts_m);
579
580        let updated = self
581            .peers
582            .update(&ip, |_key, peer| {
583                peer.latency = Some(latency);
584                peer.last_pong = Some(current_time_ms);
585                peer.last_seen_ms = current_time_ms;
586                peer.last_msg = current_time_ms;
587                peer.last_msg_type = Some("pong".to_string());
588            })
589            .await
590            .is_some();
591
592        if !updated {
593            // Create new peer if it doesn't exist
594            let new_peer = Peer {
595                ip,
596                pk: None,
597                version: None,
598                latency: Some(latency),
599                last_msg: current_time_ms,
600                last_ping: None,
601                last_pong: Some(current_time_ms),
602                shared_secret: None,
603                temporal: None,
604                rooted: None,
605                last_seen_ms: current_time_ms,
606                last_msg_type: Some("pong".to_string()),
607                handshake_status: HandshakeStatus::None,
608            };
609            self.insert_new_peer(new_peer).await;
610        }
611    }
612
613    /// Update peer activity and last message type
614    pub async fn update_peer_from_proto(&self, ip: Ipv4Addr, last_msg_type: &str) {
615        let current_time_ms = get_unix_millis_now();
616
617        let updated = self
618            .peers
619            .update(&ip, |_key, peer| {
620                peer.last_seen_ms = current_time_ms;
621                peer.last_msg = current_time_ms;
622                peer.last_msg_type = Some(last_msg_type.to_string());
623            })
624            .await
625            .is_some();
626
627        if !updated {
628            // Create new peer if it doesn't exist
629            let new_peer = Peer {
630                ip,
631                pk: None,
632                version: None,
633                latency: None,
634                last_msg: current_time_ms,
635                last_ping: None,
636                last_pong: None,
637                shared_secret: None,
638                temporal: None,
639                rooted: None,
640                last_seen_ms: current_time_ms,
641                last_msg_type: Some(last_msg_type.to_string()),
642                handshake_status: HandshakeStatus::None,
643            };
644            self.insert_new_peer(new_peer).await;
645        }
646    }
647
648    /// Set handshake status for a specific IP
649    pub async fn set_handshake_status(&self, ip: Ipv4Addr, status: HandshakeStatus) -> Result<(), Error> {
650        self.peers
651            .update(&ip, |_key, peer| {
652                peer.handshake_status = status.clone();
653            })
654            .await;
655        Ok(())
656    }
657
658    /// Update peer with version and public key information from ANR
659    pub async fn update_peer_from_anr(
660        &self,
661        ip: Ipv4Addr,
662        pk: &PublicKey,
663        version: &Ver,
664        status: Option<HandshakeStatus>,
665    ) {
666        let current_time_ms = get_unix_millis_now();
667
668        let updated = self
669            .peers
670            .update(&ip, |_key, peer| {
671                peer.pk = Some(*pk);
672                peer.version = Some(*version);
673                peer.last_seen_ms = current_time_ms;
674                if let Some(status) = &status {
675                    peer.handshake_status = status.clone();
676                }
677            })
678            .await
679            .is_some();
680
681        // Create new peer if it doesn't exist
682        if !updated {
683            let peer = Peer {
684                ip,
685                pk: Some(*pk),
686                version: Some(*version),
687                latency: None,
688                last_msg: current_time_ms,
689                last_ping: None,
690                last_pong: None,
691                shared_secret: None,
692                temporal: None,
693                rooted: None,
694                last_seen_ms: current_time_ms,
695                last_msg_type: None,
696                handshake_status: status.unwrap_or(HandshakeStatus::None),
697            };
698            let _ = self.peers.insert(ip, peer).await;
699        }
700    }
701
702    /// Returns temporal, rooted and bft heights across peers
703    pub async fn get_heights(&self, trainer_pks: &[PublicKey]) -> Result<(u64, u64, u64), Error> {
704        let mut online_trainers = Vec::new();
705        let mut online_nontrainers = Vec::new();
706
707        let now_ms = get_unix_millis_now();
708        self.peers
709            .scan(|_, peer| {
710                if let Some(pk) = peer.pk
711                    && peer.last_seen_ms > now_ms - 30_000
712                {
713                    if trainer_pks.contains(&pk) {
714                        online_trainers.push(peer.clone());
715                    } else {
716                        online_nontrainers.push(peer.clone());
717                    }
718                }
719            })
720            .await;
721
722        let mut highest_temporal = 0;
723        let mut highest_rooted = 0;
724        for peer in online_nontrainers.iter() {
725            if let Some(temporal_height) = peer.temporal.map(|t| t.height)
726                && let Some(rooted_height) = peer.rooted.map(|t| t.height)
727            {
728                if temporal_height > highest_temporal {
729                    highest_temporal = temporal_height;
730                }
731                if rooted_height > highest_rooted {
732                    highest_rooted = rooted_height;
733                }
734            }
735        }
736
737        let mut trainers_per_height = BTreeMap::new();
738        for peer in online_trainers.iter() {
739            if let Some(temporal_height) = peer.temporal.map(|t| t.height)
740                && let Some(rooted_height) = peer.rooted.map(|t| t.height)
741            {
742                if temporal_height > highest_temporal {
743                    highest_temporal = temporal_height;
744                }
745                if rooted_height > highest_rooted {
746                    highest_rooted = rooted_height;
747                }
748                *trainers_per_height.entry(rooted_height).or_insert(0) += 1;
749            }
750        }
751
752        let mut remaining_to_bft = (online_trainers.len() * 2) / 3;
753        for (height, trainers) in trainers_per_height.into_iter() {
754            remaining_to_bft = remaining_to_bft.saturating_sub(trainers);
755            if remaining_to_bft == 0 {
756                info!(
757                    "Temporal: {} Rooted: {} BFT: {} (2/3 from {} online trainers of {} total)",
758                    highest_temporal,
759                    highest_rooted,
760                    height,
761                    online_trainers.len(),
762                    trainer_pks.len()
763                );
764                return Ok((highest_temporal, highest_rooted, height));
765            }
766        }
767
768        Ok((highest_temporal, highest_rooted, 0))
769    }
770
771    pub async fn get_trainer_ips_above_rooted(
772        &self,
773        height: u64,
774        trainer_pks: &[PublicKey],
775    ) -> Result<Vec<Ipv4Addr>, Error> {
776        let online_trainers_above_temporal: Vec<Ipv4Addr> = self
777            .get_online_trainers(trainer_pks)
778            .await?
779            .into_iter()
780            .filter_map(|peer| {
781                peer.rooted.as_ref().and_then(|rooted| if rooted.height >= height { Some(peer.ip) } else { None })
782            })
783            .collect();
784
785        Ok(online_trainers_above_temporal)
786    }
787
788    pub async fn get_trainer_ips_above_temporal(
789        &self,
790        height: u64,
791        trainer_pks: &[PublicKey],
792    ) -> Result<Vec<Ipv4Addr>, Error> {
793        let online_trainers_above_temporal: Vec<Ipv4Addr> = self
794            .get_online_trainers(trainer_pks)
795            .await?
796            .into_iter()
797            .filter_map(|peer| {
798                peer.temporal.as_ref().and_then(|temporal| if temporal.height >= height { Some(peer.ip) } else { None })
799            })
800            .collect();
801
802        Ok(online_trainers_above_temporal)
803    }
804
805    pub async fn get_online_trainers(&self, trainer_pks: &[PublicKey]) -> Result<Vec<Peer>, Error> {
806        let online_trainers: Vec<Peer> = self
807            .get_online()
808            .await?
809            .into_iter()
810            .filter(|peer| peer.pk.as_ref().map_or(false, |pk| trainer_pks.contains(pk)))
811            .collect();
812        Ok(online_trainers)
813    }
814
815    /// Get peers summary with counts
816    pub async fn get_peers_summary(&self, my_ip: Ipv4Addr, trainer_pks: &[PublicKey]) -> Result<PeersSummary, Error> {
817        let all_peers = self.get_all().await?;
818        let mut online = 0;
819        let mut connecting = 0;
820        let mut trainers = 0;
821        let mut peers_map = HashMap::new();
822
823        for peer in all_peers {
824            if peer.ip == my_ip {
825                continue;
826            }
827
828            match peer.handshake_status {
829                HandshakeStatus::Completed => {
830                    online += 1;
831                    if peer.pk.as_ref().map_or(false, |pk| trainer_pks.contains(pk)) {
832                        trainers += 1;
833                    }
834                }
835                HandshakeStatus::Initiated => connecting += 1,
836                _ => {}
837            }
838
839            let peer_info = PeerInfo {
840                last_ts: peer.last_seen_ms,
841                last_msg: peer.last_msg_type.unwrap_or_else(|| "unknown".to_string()),
842                handshake_status: peer.handshake_status.clone(),
843                version: peer.version,
844                height: peer.temporal.map(|t| t.height).unwrap_or(0),
845                temporal_height: peer.temporal.map(|t| t.height).unwrap_or(0),
846                rooted_height: peer.rooted.map(|r| r.height).unwrap_or(0),
847                latency: peer.latency.unwrap_or(0),
848            };
849            peers_map.insert(peer.ip.to_string(), peer_info);
850        }
851
852        Ok(PeersSummary { online, connecting, trainers, peers: peers_map })
853    }
854}
855
856#[derive(Debug, Clone, Serialize, Deserialize)]
857pub struct PeerInfo {
858    pub last_ts: u64,
859    pub last_msg: String,
860    pub handshake_status: HandshakeStatus,
861    pub version: Option<Ver>,
862    pub height: u64,
863    pub temporal_height: u64,
864    pub rooted_height: u64,
865    pub latency: u64,
866}
867
868#[derive(Debug, Clone, Serialize, Deserialize)]
869pub struct PeersSummary {
870    pub online: usize,
871    pub connecting: usize,
872    pub trainers: usize,
873    pub peers: HashMap<String, PeerInfo>,
874}
875
876#[derive(Debug)]
877pub enum Who {
878    Some(Vec<Ipv4Addr>),
879    Trainers,
880    NotTrainers(usize),
881    Random(usize),
882}
883
884#[derive(Debug, Clone)]
885pub struct HeightFilter {
886    pub min_temporal: Option<u64>,
887    pub min_rooted: Option<u64>,
888    pub take: Option<usize>,
889    pub sort: Option<String>,
890    pub latency: Option<u64>,
891    pub latency1: Option<u64>,
892    pub latency2: Option<u64>,
893}
894
895#[cfg(test)]
896mod tests {
897    use super::*;
898
899    #[tokio::test]
900    async fn test_peer_operations() {
901        let node_peers = NodePeers::new(100);
902        let ip = Ipv4Addr::new(127, 0, 0, 1);
903
904        let ts_m = get_unix_millis_now();
905        let mut test_pk = [0u8; 48];
906        test_pk[0] = 1;
907        test_pk[1] = 2;
908        test_pk[2] = 3;
909
910        let peer = Peer {
911            ip,
912            pk: Some(test_pk.into()),
913            version: Some(Ver::new(1, 0, 0)),
914            latency: Some(100),
915            last_msg: ts_m,
916            last_ping: Some(ts_m),
917            last_pong: None,
918            shared_secret: None,
919            temporal: None,
920            rooted: None,
921            last_seen_ms: ts_m,
922            last_msg_type: Some("ping".to_string()),
923            handshake_status: HandshakeStatus::None,
924        };
925
926        // Test insert
927        assert!(node_peers.insert_new_peer(peer.clone()).await);
928
929        // Test size
930        assert_eq!(node_peers.size().await, 1);
931
932        // Test by_ip
933        let retrieved = node_peers.by_ip(ip).await;
934        assert!(retrieved.is_some());
935        assert_eq!(retrieved.unwrap().ip, ip);
936
937        // Test peer was inserted correctly
938        let retrieved = node_peers.by_ip(ip).await.unwrap();
939        assert_eq!(retrieved.last_msg, ts_m);
940    }
941}