1use crate::config::Config;
2use crate::consensus::doms::EntrySummary;
3use crate::node::anr;
4use crate::node::protocol::{EventTip, PingReply, Typename};
5use crate::utils::misc::get_unix_millis_now;
6use crate::utils::{Hash, PublicKey};
7use crate::{Context, Ver};
8use serde::{Deserialize, Serialize};
9use std::collections::{BTreeMap, HashMap};
10use std::hash::Hash as StdHash;
11use std::net::Ipv4Addr;
12use std::sync::Arc;
13use tokio::sync::RwLock;
14use tokio::task::spawn_blocking;
15use tracing::{info, warn};
16
17#[derive(Debug, Clone, Serialize, Deserialize, PartialEq, Eq)]
19#[serde(rename_all = "snake_case")]
20pub enum HandshakeStatus {
21 None,
23 Initiated,
25 Completed,
27}
28
29#[derive(Debug)]
30struct ConcurrentMap<K, V> {
31 inner: RwLock<HashMap<K, V>>,
32}
33
34impl<K: Eq + StdHash + Clone, V: Clone> ConcurrentMap<K, V> {
35 fn new() -> Self {
36 Self { inner: RwLock::new(HashMap::new()) }
37 }
38 async fn len(&self) -> usize {
39 self.inner.read().await.len()
40 }
41 async fn insert(&self, key: K, value: V) -> Result<(), ()> {
42 let mut map = self.inner.write().await;
43 if map.contains_key(&key) {
44 Err(())
45 } else {
46 map.insert(key, value);
47 Ok(())
48 }
49 }
50 async fn remove(&self, key: &K) -> Option<V> {
51 self.inner.write().await.remove(key)
52 }
53 async fn read<R>(&self, key: &K, f: impl FnOnce(&K, &V) -> R) -> Option<R> {
54 let v = {
55 let map = self.inner.read().await;
56 map.get(key).cloned()
57 };
58 v.as_ref().map(|vv| f(key, vv))
59 }
60 async fn scan(&self, mut f: impl FnMut(&K, &V)) {
61 let snapshot: Vec<(K, V)> = {
62 let map = self.inner.read().await;
63 map.iter().map(|(k, v)| (k.clone(), v.clone())).collect()
64 };
65 for (k, v) in snapshot.iter() {
66 f(k, v);
67 }
68 }
69 async fn update<R>(&self, key: &K, mut f: impl FnMut(&K, &mut V) -> R) -> Option<R> {
70 let mut map = self.inner.write().await;
71 if let Some(v) = map.get_mut(key) { Some(f(key, v)) } else { None }
72 }
73}
74
75#[derive(Debug, thiserror::Error, strum_macros::IntoStaticStr)]
76pub enum Error {
77 #[error(transparent)]
78 Anr(#[from] anr::Error),
79}
80
81impl crate::utils::misc::Typename for Error {
82 fn typename(&self) -> &'static str {
83 self.into()
84 }
85}
86
87#[derive(Debug, Clone, Serialize, Deserialize)]
88pub struct Peer {
89 pub ip: Ipv4Addr,
90 pub pk: Option<PublicKey>,
91 pub version: Option<Ver>,
92 pub latency: Option<u64>,
93 pub last_msg: u64,
94 pub last_ping: Option<u64>,
95 pub last_pong: Option<u64>,
96 pub shared_secret: Option<Vec<u8>>,
97 pub temporal: Option<TipInfo>,
98 pub rooted: Option<TipInfo>,
99 pub last_seen_ms: u64,
100 pub last_msg_type: Option<String>,
101 pub handshake_status: HandshakeStatus,
102}
103
104impl Peer {
105 pub fn is_handshaked(&self) -> bool {
107 self.handshake_status == HandshakeStatus::Completed
108 }
109
110 pub fn is_online(&self) -> bool {
112 get_unix_millis_now().saturating_sub(self.last_ping.unwrap_or_default()) <= 6_000
115 }
116}
117
118#[derive(Debug, Copy, Clone, Serialize, Deserialize)]
119pub struct TipInfo {
120 pub height: u64,
121 pub prev_hash: Hash,
122}
123
124impl From<EntrySummary> for TipInfo {
125 fn from(summary: EntrySummary) -> Self {
126 Self { height: summary.header.height, prev_hash: summary.header.prev_hash }
127 }
128}
129
130#[derive(Debug, Clone, Serialize, Deserialize)]
131pub struct HeaderInfo {
132 pub height: u64,
133 pub prev_hash: Hash,
134}
135
136#[derive(Debug, Clone)]
138pub struct NodePeers {
139 peers: Arc<ConcurrentMap<Ipv4Addr, Peer>>,
140 max_peers: usize,
141}
142
143impl NodePeers {
144 pub fn new(max_peers: usize) -> Self {
146 Self { peers: Arc::new(ConcurrentMap::new()), max_peers }
147 }
148
149 pub fn default() -> Self {
151 Self::new(100)
152 }
153
154 pub async fn clear_stale(&self, fabric: &crate::consensus::fabric::Fabric, node_registry: &anr::NodeAnrs) -> usize {
155 self.clear_stale_inner(fabric, node_registry)
156 .await
157 .inspect_err(|e| warn!("peer cleanup error: {}", e))
158 .unwrap_or(0)
159 }
160
161 pub async fn clear_stale_inner(
163 &self,
164 fabric: &crate::consensus::fabric::Fabric,
165 node_registry: &anr::NodeAnrs,
166 ) -> Result<usize, Error> {
167 let ts_m = get_unix_millis_now();
168 let height = fabric.get_temporal_height_or_0();
169 let validators = fabric.trainers_for_height(height + 1).unwrap_or_default();
170 let validators_vec: Vec<Vec<u8>> = validators.iter().map(|pk| pk.to_vec()).collect();
171
172 let validator_anr_ips = node_registry.by_pks_ip(&validators_vec).await;
173 let validators_map: std::collections::HashSet<&PublicKey> = validators.iter().collect();
174
175 let handshaked_ips = node_registry.get_all_handshaked_ip4().await;
176
177 let mut cur_ips = Vec::new();
178 let mut cur_val_ips = Vec::new();
179
180 let mut to_remove = Vec::new();
182 self.peers
183 .scan(|ip, peer| {
184 if ts_m > (peer.last_msg + 60_000) {
186 to_remove.push(*ip);
187 return;
188 }
189
190 if let Some(ref pk) = peer.pk {
191 if validators_map.contains(pk) {
192 cur_val_ips.push(*ip);
193 } else {
194 cur_ips.push(*ip);
195 }
196 } else {
197 cur_ips.push(*ip);
198 }
199 })
200 .await;
201
202 let cleared_count = to_remove.len();
204 for ip in to_remove {
205 let _ = self.peers.remove(&ip).await;
206 }
207
208 let missing_vals: Vec<_> = validator_anr_ips.iter().filter(|ip| !cur_val_ips.contains(ip)).cloned().collect();
210
211 let missing_ips: Vec<_> = handshaked_ips.into_iter().filter(|ip| !cur_ips.contains(ip)).collect();
212
213 let add_size = self
215 .max_peers
216 .saturating_sub(self.size().await)
217 .saturating_sub(cur_val_ips.len())
218 .saturating_sub(missing_vals.len());
219
220 let missing_ips = spawn_blocking(move || {
221 let mut missing_ips = missing_ips;
223 use rand::seq::SliceRandom;
224 let mut rng = rand::rng();
225 missing_ips.shuffle(&mut rng);
226 missing_ips.truncate(add_size);
227 missing_ips
228 })
229 .await
230 .unwrap_or_default();
231
232 for ip in missing_vals.iter().chain(missing_ips.iter()) {
234 let mut handshake_status = HandshakeStatus::None;
236 let anrs = node_registry.get_all().await;
237
238 for anr in anrs {
239 if anr.ip4 == *ip {
240 handshake_status = if anr.handshaked { HandshakeStatus::Completed } else { HandshakeStatus::None };
241 break;
242 }
243 }
244
245 let _ = self
246 .insert_new_peer(Peer {
247 ip: *ip,
248 pk: None,
249 version: None,
250 latency: None,
251 last_msg: ts_m,
252 last_ping: None,
253 last_pong: None,
254 shared_secret: None,
255 temporal: None,
256 rooted: None,
257 last_seen_ms: ts_m,
258 last_msg_type: None,
259 handshake_status,
260 })
261 .await;
262 }
263
264 Ok(cleared_count)
265 }
266
267 pub async fn insert_new_peer(&self, mut peer: Peer) -> bool {
269 if peer.last_msg == 0 {
270 peer.last_msg = get_unix_millis_now();
271 }
272
273 self.peers.insert(peer.ip, peer).await.is_ok()
274 }
275
276 pub async fn seed(
278 &self,
279 fabric: &crate::consensus::fabric::Fabric,
280 config: &Config,
281 node_anrs: &anr::NodeAnrs,
282 ) -> Result<(), Error> {
283 let height = fabric.get_temporal_height_or_0();
284 let validators = fabric.trainers_for_height(height + 1).unwrap_or_default();
285 let validators: Vec<Vec<u8>> = validators.iter().map(|pk| pk.to_vec()).collect();
286
287 let validator_ips: Vec<_> =
288 node_anrs.by_pks_ip(&validators).await.into_iter().filter(|ip| *ip != config.get_public_ipv4()).collect();
289
290 let ts_m = get_unix_millis_now();
291 for ip in validator_ips {
292 let _ = self.insert_new_peer(Peer {
293 ip,
294 pk: None,
295 version: None,
296 latency: None,
297 last_msg: ts_m,
298 last_ping: None,
299 last_pong: None,
300 shared_secret: None,
301 temporal: None,
302 rooted: None,
303 last_seen_ms: ts_m,
304 last_msg_type: None,
305 handshake_status: HandshakeStatus::None,
306 });
307 }
308
309 Ok(())
310 }
311
312 pub async fn size(&self) -> usize {
314 self.peers.len().await
315 }
316
317 pub async fn random(&self, no: usize) -> Result<Vec<Peer>, Error> {
319 let online_peers = self.get_online().await?;
320 if online_peers.is_empty() {
321 return Ok(vec![]);
322 }
323
324 use rand::seq::SliceRandom;
325 let mut rng = rand::rng();
326 let mut peers = online_peers;
327 peers.shuffle(&mut rng);
328 peers.truncate(no);
329
330 Ok(peers)
331 }
332
333 pub async fn get_all(&self) -> Result<Vec<Peer>, Error> {
335 let mut peers = Vec::new();
336 self.peers
337 .scan(|_, peer| {
338 peers.push(peer.clone());
339 })
340 .await;
341 Ok(peers)
342 }
343
344 pub async fn get_online(&self) -> Result<Vec<Peer>, Error> {
346 let mut online_peers = Vec::new();
347
348 self.peers
349 .scan(|_, peer| {
350 if peer.is_online() {
351 online_peers.push(peer.clone());
352 }
353 })
354 .await;
355
356 Ok(online_peers)
357 }
358
359 pub async fn get_online_ip_l_th_rh(&self) -> Result<Vec<(Ipv4Addr, Option<u64>, Option<u64>, Option<u64>)>, Error> {
361 let online_peers = self.get_online().await?;
362 let mut summary = Vec::new();
363
364 for peer in online_peers {
365 let temporal_height = peer.temporal.as_ref().map(|t| t.height);
366 let rooted_height = peer.rooted.as_ref().map(|r| r.height);
367
368 summary.push((peer.ip, peer.latency, temporal_height, rooted_height));
369 }
370
371 Ok(summary)
372 }
373
374 pub async fn get_shared_secret(&self, pk: &[u8]) -> Result<Vec<u8>, Error> {
376 if pk.is_empty() {
377 return Ok(vec![]);
378 }
379
380 let mut found_secret = None;
381 self.peers
382 .scan(|_, peer| {
383 if let Some(ref peer_pk) = peer.pk {
384 if peer_pk == pk {
385 found_secret = peer.shared_secret.clone();
386 }
387 }
388 })
389 .await;
390
391 Ok(found_secret.unwrap_or_default())
393 }
394
395 pub async fn by_ip(&self, ip: Ipv4Addr) -> Option<Peer> {
397 self.peers.read(&ip, |_, peer| peer.clone()).await
398 }
399
400 pub async fn by_pks(&self, pks: &[Vec<u8>]) -> Result<Vec<Peer>, Error> {
402 let mut peers = Vec::new();
403
404 self.peers
405 .scan(|_, peer| {
406 if let Some(ref peer_pk) = peer.pk {
407 if pks.iter().any(|pk| pk.as_slice() == peer_pk.as_slice()) {
409 peers.push(peer.clone());
410 }
411 }
412 })
413 .await;
414
415 Ok(peers)
416 }
417
418 pub async fn for_height(&self, fabric: &crate::consensus::fabric::Fabric, height: u64) -> Result<Vec<Peer>, Error> {
420 let trainers = fabric.trainers_for_height(height).unwrap_or_default();
421 let trainers_set: std::collections::HashSet<&PublicKey> = trainers.iter().collect();
422 let mut peers = Vec::new();
423
424 self.peers
425 .scan(|_, peer| {
426 if let Some(ref pk) = peer.pk {
427 if trainers_set.contains(pk) {
428 peers.push(peer.clone());
429 }
430 }
431 })
432 .await;
433
434 Ok(peers)
435 }
436
437 pub async fn by_who(&self, fabric: &crate::consensus::fabric::Fabric, who: Who) -> Result<Vec<Ipv4Addr>, Error> {
439 match who {
440 Who::Some(peer_ips) => Ok(peer_ips),
441 Who::Trainers => {
442 let height = fabric.get_temporal_height_or_0();
443 let trainer_peers = self.for_height(fabric, height + 1).await?;
444 let mut ips: Vec<_> = trainer_peers.iter().map(|p| p.ip).collect();
445
446 if ips.is_empty() {
447 return Ok(vec![]);
448 }
449
450 use rand::seq::SliceRandom;
451 let mut rng = rand::rng();
452 ips.shuffle(&mut rng);
453 Ok(ips)
454 }
455 Who::NotTrainers(cnt) => {
456 let height = fabric.get_temporal_height_or_0();
457 let trainer_peers = self.for_height(fabric, height + 1).await?;
458 let trainer_ips: std::collections::HashSet<_> = trainer_peers.iter().map(|p| p.ip).collect();
459
460 let all_peers = self.get_all().await?;
461 let not_trainer_ips: Vec<_> =
462 all_peers.iter().map(|p| p.ip).filter(|ip| !trainer_ips.contains(ip)).collect();
463
464 if not_trainer_ips.is_empty() {
465 return Ok(vec![]);
466 }
467
468 use rand::seq::SliceRandom;
469 let mut rng = rand::rng();
470 let mut ips = not_trainer_ips;
471 ips.shuffle(&mut rng);
472 ips.truncate(cnt);
473 Ok(ips)
474 }
475 Who::Random(no) => {
476 let random_peers = self.random(no).await?;
477 Ok(random_peers.iter().map(|p| p.ip).collect())
478 }
479 }
480 }
481
482 pub async fn update_peer_ping_timestamp(&self, ip: Ipv4Addr, ts_m: u64) {
531 self.peers
533 .update(&ip, |_key, peer| {
534 peer.last_ping = Some(ts_m);
535 })
536 .await;
537 }
538
539 pub async fn update_peer_from_tip(&self, _ctx: &Context, ip: Ipv4Addr, tip: &EventTip) {
540 let current_time_ms = get_unix_millis_now();
541 let temporal: TipInfo = tip.temporal.clone().into();
542 let rooted: TipInfo = tip.rooted.clone().into();
543
544 let updated = self
545 .peers
546 .update(&ip, |_key, peer| {
547 peer.last_seen_ms = current_time_ms;
548 peer.last_msg = current_time_ms;
549 peer.last_msg_type = Some(tip.typename().to_string());
550 peer.temporal = Some(temporal.clone());
551 peer.rooted = Some(rooted.clone());
552 })
553 .await
554 .is_some();
555
556 if !updated {
557 let new_peer = Peer {
558 ip,
559 pk: None, version: None,
561 latency: None,
562 last_msg: current_time_ms,
563 last_seen_ms: current_time_ms,
564 last_ping: None,
565 last_pong: None,
566 shared_secret: None,
567 temporal: Some(temporal),
568 rooted: Some(rooted),
569 last_msg_type: Some(tip.typename().to_string()),
570 handshake_status: HandshakeStatus::None,
571 };
572 self.insert_new_peer(new_peer).await;
573 }
574 }
575
576 pub async fn update_peer_from_pong(&self, ip: Ipv4Addr, pong: &PingReply) {
577 let current_time_ms = get_unix_millis_now();
578 let latency = current_time_ms.saturating_sub(pong.ts_m);
579
580 let updated = self
581 .peers
582 .update(&ip, |_key, peer| {
583 peer.latency = Some(latency);
584 peer.last_pong = Some(current_time_ms);
585 peer.last_seen_ms = current_time_ms;
586 peer.last_msg = current_time_ms;
587 peer.last_msg_type = Some("pong".to_string());
588 })
589 .await
590 .is_some();
591
592 if !updated {
593 let new_peer = Peer {
595 ip,
596 pk: None,
597 version: None,
598 latency: Some(latency),
599 last_msg: current_time_ms,
600 last_ping: None,
601 last_pong: Some(current_time_ms),
602 shared_secret: None,
603 temporal: None,
604 rooted: None,
605 last_seen_ms: current_time_ms,
606 last_msg_type: Some("pong".to_string()),
607 handshake_status: HandshakeStatus::None,
608 };
609 self.insert_new_peer(new_peer).await;
610 }
611 }
612
613 pub async fn update_peer_from_proto(&self, ip: Ipv4Addr, last_msg_type: &str) {
615 let current_time_ms = get_unix_millis_now();
616
617 let updated = self
618 .peers
619 .update(&ip, |_key, peer| {
620 peer.last_seen_ms = current_time_ms;
621 peer.last_msg = current_time_ms;
622 peer.last_msg_type = Some(last_msg_type.to_string());
623 })
624 .await
625 .is_some();
626
627 if !updated {
628 let new_peer = Peer {
630 ip,
631 pk: None,
632 version: None,
633 latency: None,
634 last_msg: current_time_ms,
635 last_ping: None,
636 last_pong: None,
637 shared_secret: None,
638 temporal: None,
639 rooted: None,
640 last_seen_ms: current_time_ms,
641 last_msg_type: Some(last_msg_type.to_string()),
642 handshake_status: HandshakeStatus::None,
643 };
644 self.insert_new_peer(new_peer).await;
645 }
646 }
647
648 pub async fn set_handshake_status(&self, ip: Ipv4Addr, status: HandshakeStatus) -> Result<(), Error> {
650 self.peers
651 .update(&ip, |_key, peer| {
652 peer.handshake_status = status.clone();
653 })
654 .await;
655 Ok(())
656 }
657
658 pub async fn update_peer_from_anr(
660 &self,
661 ip: Ipv4Addr,
662 pk: &PublicKey,
663 version: &Ver,
664 status: Option<HandshakeStatus>,
665 ) {
666 let current_time_ms = get_unix_millis_now();
667
668 let updated = self
669 .peers
670 .update(&ip, |_key, peer| {
671 peer.pk = Some(*pk);
672 peer.version = Some(*version);
673 peer.last_seen_ms = current_time_ms;
674 if let Some(status) = &status {
675 peer.handshake_status = status.clone();
676 }
677 })
678 .await
679 .is_some();
680
681 if !updated {
683 let peer = Peer {
684 ip,
685 pk: Some(*pk),
686 version: Some(*version),
687 latency: None,
688 last_msg: current_time_ms,
689 last_ping: None,
690 last_pong: None,
691 shared_secret: None,
692 temporal: None,
693 rooted: None,
694 last_seen_ms: current_time_ms,
695 last_msg_type: None,
696 handshake_status: status.unwrap_or(HandshakeStatus::None),
697 };
698 let _ = self.peers.insert(ip, peer).await;
699 }
700 }
701
702 pub async fn get_heights(&self, trainer_pks: &[PublicKey]) -> Result<(u64, u64, u64), Error> {
704 let mut online_trainers = Vec::new();
705 let mut online_nontrainers = Vec::new();
706
707 let now_ms = get_unix_millis_now();
708 self.peers
709 .scan(|_, peer| {
710 if let Some(pk) = peer.pk
711 && peer.last_seen_ms > now_ms - 30_000
712 {
713 if trainer_pks.contains(&pk) {
714 online_trainers.push(peer.clone());
715 } else {
716 online_nontrainers.push(peer.clone());
717 }
718 }
719 })
720 .await;
721
722 let mut highest_temporal = 0;
723 let mut highest_rooted = 0;
724 for peer in online_nontrainers.iter() {
725 if let Some(temporal_height) = peer.temporal.map(|t| t.height)
726 && let Some(rooted_height) = peer.rooted.map(|t| t.height)
727 {
728 if temporal_height > highest_temporal {
729 highest_temporal = temporal_height;
730 }
731 if rooted_height > highest_rooted {
732 highest_rooted = rooted_height;
733 }
734 }
735 }
736
737 let mut trainers_per_height = BTreeMap::new();
738 for peer in online_trainers.iter() {
739 if let Some(temporal_height) = peer.temporal.map(|t| t.height)
740 && let Some(rooted_height) = peer.rooted.map(|t| t.height)
741 {
742 if temporal_height > highest_temporal {
743 highest_temporal = temporal_height;
744 }
745 if rooted_height > highest_rooted {
746 highest_rooted = rooted_height;
747 }
748 *trainers_per_height.entry(rooted_height).or_insert(0) += 1;
749 }
750 }
751
752 let mut remaining_to_bft = (online_trainers.len() * 2) / 3;
753 for (height, trainers) in trainers_per_height.into_iter() {
754 remaining_to_bft = remaining_to_bft.saturating_sub(trainers);
755 if remaining_to_bft == 0 {
756 info!(
757 "Temporal: {} Rooted: {} BFT: {} (2/3 from {} online trainers of {} total)",
758 highest_temporal,
759 highest_rooted,
760 height,
761 online_trainers.len(),
762 trainer_pks.len()
763 );
764 return Ok((highest_temporal, highest_rooted, height));
765 }
766 }
767
768 Ok((highest_temporal, highest_rooted, 0))
769 }
770
771 pub async fn get_trainer_ips_above_rooted(
772 &self,
773 height: u64,
774 trainer_pks: &[PublicKey],
775 ) -> Result<Vec<Ipv4Addr>, Error> {
776 let online_trainers_above_temporal: Vec<Ipv4Addr> = self
777 .get_online_trainers(trainer_pks)
778 .await?
779 .into_iter()
780 .filter_map(|peer| {
781 peer.rooted.as_ref().and_then(|rooted| if rooted.height >= height { Some(peer.ip) } else { None })
782 })
783 .collect();
784
785 Ok(online_trainers_above_temporal)
786 }
787
788 pub async fn get_trainer_ips_above_temporal(
789 &self,
790 height: u64,
791 trainer_pks: &[PublicKey],
792 ) -> Result<Vec<Ipv4Addr>, Error> {
793 let online_trainers_above_temporal: Vec<Ipv4Addr> = self
794 .get_online_trainers(trainer_pks)
795 .await?
796 .into_iter()
797 .filter_map(|peer| {
798 peer.temporal.as_ref().and_then(|temporal| if temporal.height >= height { Some(peer.ip) } else { None })
799 })
800 .collect();
801
802 Ok(online_trainers_above_temporal)
803 }
804
805 pub async fn get_online_trainers(&self, trainer_pks: &[PublicKey]) -> Result<Vec<Peer>, Error> {
806 let online_trainers: Vec<Peer> = self
807 .get_online()
808 .await?
809 .into_iter()
810 .filter(|peer| peer.pk.as_ref().map_or(false, |pk| trainer_pks.contains(pk)))
811 .collect();
812 Ok(online_trainers)
813 }
814
815 pub async fn get_peers_summary(&self, my_ip: Ipv4Addr, trainer_pks: &[PublicKey]) -> Result<PeersSummary, Error> {
817 let all_peers = self.get_all().await?;
818 let mut online = 0;
819 let mut connecting = 0;
820 let mut trainers = 0;
821 let mut peers_map = HashMap::new();
822
823 for peer in all_peers {
824 if peer.ip == my_ip {
825 continue;
826 }
827
828 match peer.handshake_status {
829 HandshakeStatus::Completed => {
830 online += 1;
831 if peer.pk.as_ref().map_or(false, |pk| trainer_pks.contains(pk)) {
832 trainers += 1;
833 }
834 }
835 HandshakeStatus::Initiated => connecting += 1,
836 _ => {}
837 }
838
839 let peer_info = PeerInfo {
840 last_ts: peer.last_seen_ms,
841 last_msg: peer.last_msg_type.unwrap_or_else(|| "unknown".to_string()),
842 handshake_status: peer.handshake_status.clone(),
843 version: peer.version,
844 height: peer.temporal.map(|t| t.height).unwrap_or(0),
845 temporal_height: peer.temporal.map(|t| t.height).unwrap_or(0),
846 rooted_height: peer.rooted.map(|r| r.height).unwrap_or(0),
847 latency: peer.latency.unwrap_or(0),
848 };
849 peers_map.insert(peer.ip.to_string(), peer_info);
850 }
851
852 Ok(PeersSummary { online, connecting, trainers, peers: peers_map })
853 }
854}
855
856#[derive(Debug, Clone, Serialize, Deserialize)]
857pub struct PeerInfo {
858 pub last_ts: u64,
859 pub last_msg: String,
860 pub handshake_status: HandshakeStatus,
861 pub version: Option<Ver>,
862 pub height: u64,
863 pub temporal_height: u64,
864 pub rooted_height: u64,
865 pub latency: u64,
866}
867
868#[derive(Debug, Clone, Serialize, Deserialize)]
869pub struct PeersSummary {
870 pub online: usize,
871 pub connecting: usize,
872 pub trainers: usize,
873 pub peers: HashMap<String, PeerInfo>,
874}
875
876#[derive(Debug)]
877pub enum Who {
878 Some(Vec<Ipv4Addr>),
879 Trainers,
880 NotTrainers(usize),
881 Random(usize),
882}
883
884#[derive(Debug, Clone)]
885pub struct HeightFilter {
886 pub min_temporal: Option<u64>,
887 pub min_rooted: Option<u64>,
888 pub take: Option<usize>,
889 pub sort: Option<String>,
890 pub latency: Option<u64>,
891 pub latency1: Option<u64>,
892 pub latency2: Option<u64>,
893}
894
895#[cfg(test)]
896mod tests {
897 use super::*;
898
899 #[tokio::test]
900 async fn test_peer_operations() {
901 let node_peers = NodePeers::new(100);
902 let ip = Ipv4Addr::new(127, 0, 0, 1);
903
904 let ts_m = get_unix_millis_now();
905 let mut test_pk = [0u8; 48];
906 test_pk[0] = 1;
907 test_pk[1] = 2;
908 test_pk[2] = 3;
909
910 let peer = Peer {
911 ip,
912 pk: Some(test_pk.into()),
913 version: Some(Ver::new(1, 0, 0)),
914 latency: Some(100),
915 last_msg: ts_m,
916 last_ping: Some(ts_m),
917 last_pong: None,
918 shared_secret: None,
919 temporal: None,
920 rooted: None,
921 last_seen_ms: ts_m,
922 last_msg_type: Some("ping".to_string()),
923 handshake_status: HandshakeStatus::None,
924 };
925
926 assert!(node_peers.insert_new_peer(peer.clone()).await);
928
929 assert_eq!(node_peers.size().await, 1);
931
932 let retrieved = node_peers.by_ip(ip).await;
934 assert!(retrieved.is_some());
935 assert_eq!(retrieved.unwrap().ip, ip);
936
937 let retrieved = node_peers.by_ip(ip).await.unwrap();
939 assert_eq!(retrieved.last_msg, ts_m);
940 }
941}