1use dashmap::DashMap;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use std::net::IpAddr;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14pub type NodeId = [u8; 32];
16
17#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum Region {
20 NorthAmerica(String),
22 SouthAmerica(String),
24 Europe(String),
26 AsiaPacific(String),
28 MiddleEast(String),
30 Africa(String),
32 Custom(String),
34}
35
36impl Region {
37 pub fn continent(&self) -> &'static str {
39 match self {
40 Region::NorthAmerica(_) => "north_america",
41 Region::SouthAmerica(_) => "south_america",
42 Region::Europe(_) => "europe",
43 Region::AsiaPacific(_) => "asia_pacific",
44 Region::MiddleEast(_) => "middle_east",
45 Region::Africa(_) => "africa",
46 Region::Custom(_) => "custom",
47 }
48 }
49
50 pub fn zone(&self) -> &str {
52 match self {
53 Region::NorthAmerica(z)
54 | Region::SouthAmerica(z)
55 | Region::Europe(z)
56 | Region::AsiaPacific(z)
57 | Region::MiddleEast(z)
58 | Region::Africa(z)
59 | Region::Custom(z) => z,
60 }
61 }
62}
63
64#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
66pub struct LocationInfo {
67 pub region: Region,
69 pub zone: Option<String>,
71 pub latitude: Option<f64>,
73 pub longitude: Option<f64>,
75 pub asn: Option<u32>,
77 pub provider: Option<String>,
79 pub datacenter: Option<String>,
81 pub country_code: Option<String>,
83 pub city: Option<String>,
85}
86
87impl LocationInfo {
88 pub fn new(region: Region) -> Self {
90 Self {
91 region,
92 zone: None,
93 latitude: None,
94 longitude: None,
95 asn: None,
96 provider: None,
97 datacenter: None,
98 country_code: None,
99 city: None,
100 }
101 }
102
103 pub fn with_coordinates(mut self, lat: f64, lon: f64) -> Self {
105 self.latitude = Some(lat.clamp(-90.0, 90.0));
106 self.longitude = Some(lon.clamp(-180.0, 180.0));
107 self
108 }
109
110 pub fn with_asn(mut self, asn: u32) -> Self {
112 self.asn = Some(asn);
113 self
114 }
115
116 pub fn with_provider(mut self, provider: impl Into<String>) -> Self {
118 self.provider = Some(provider.into());
119 self
120 }
121
122 pub fn distance_to(&self, other: &LocationInfo) -> Option<f64> {
125 let (lat1, lon1) = (self.latitude?, self.longitude?);
126 let (lat2, lon2) = (other.latitude?, other.longitude?);
127
128 let r = 6371.0; let d_lat = (lat2 - lat1).to_radians();
130 let d_lon = (lon2 - lon1).to_radians();
131 let lat1_rad = lat1.to_radians();
132 let lat2_rad = lat2.to_radians();
133
134 let a = (d_lat / 2.0).sin().powi(2)
135 + lat1_rad.cos() * lat2_rad.cos() * (d_lon / 2.0).sin().powi(2);
136 let c = 2.0 * a.sqrt().asin();
137
138 Some(r * c)
139 }
140
141 pub fn same_continent(&self, other: &LocationInfo) -> bool {
143 self.region.continent() == other.region.continent()
144 }
145
146 pub fn same_region(&self, other: &LocationInfo) -> bool {
148 self.region == other.region
149 }
150}
151
152#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
154pub enum NatType {
155 None,
157 FullCone,
159 RestrictedCone,
161 PortRestrictedCone,
163 Symmetric,
165 Unknown,
167}
168
169impl NatType {
170 pub fn difficulty(&self) -> u8 {
172 match self {
173 NatType::None => 0,
174 NatType::FullCone => 1,
175 NatType::RestrictedCone => 2,
176 NatType::PortRestrictedCone => 3,
177 NatType::Symmetric => 4,
178 NatType::Unknown => 3,
179 }
180 }
181
182 pub fn can_connect_direct(&self, other: &NatType) -> bool {
184 self.difficulty() + other.difficulty() < 5
186 }
187}
188
189#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
191pub enum NetworkTier {
192 Edge = 0,
194 Consumer = 1,
196 Business = 2,
198 Datacenter = 3,
200 Premium = 4,
202 Core = 5,
204}
205
206impl NetworkTier {
207 pub fn priority(&self) -> u8 {
209 *self as u8
210 }
211}
212
213#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
215pub struct TopologyHints {
216 pub preferred_peers: Vec<NodeId>,
218 pub tier: NetworkTier,
220 pub uplink_gbps: Option<u32>,
222 pub downlink_gbps: Option<u32>,
224 pub nat_type: NatType,
226 pub can_relay: bool,
228 pub max_relay_connections: Option<u32>,
230 pub public_addresses: Vec<IpAddr>,
232 pub reflexive_address: Option<IpAddr>,
234 #[serde(skip)]
236 pub peer_latencies: HashMap<NodeId, u32>,
237 pub hop_distances: HashMap<String, u8>,
239}
240
241impl Default for TopologyHints {
242 fn default() -> Self {
243 Self {
244 preferred_peers: Vec::new(),
245 tier: NetworkTier::Consumer,
246 uplink_gbps: None,
247 downlink_gbps: None,
248 nat_type: NatType::Unknown,
249 can_relay: false,
250 max_relay_connections: None,
251 public_addresses: Vec::new(),
252 reflexive_address: None,
253 peer_latencies: HashMap::new(),
254 hop_distances: HashMap::new(),
255 }
256 }
257}
258
259impl TopologyHints {
260 pub fn new(tier: NetworkTier) -> Self {
262 Self {
263 tier,
264 ..Default::default()
265 }
266 }
267
268 pub fn with_bandwidth(mut self, uplink: u32, downlink: u32) -> Self {
270 self.uplink_gbps = Some(uplink);
271 self.downlink_gbps = Some(downlink);
272 self
273 }
274
275 pub fn with_nat(mut self, nat_type: NatType) -> Self {
277 self.nat_type = nat_type;
278 self
279 }
280
281 pub fn with_relay(mut self, max_connections: u32) -> Self {
283 self.can_relay = true;
284 self.max_relay_connections = Some(max_connections);
285 self
286 }
287
288 pub fn add_preferred_peer(&mut self, peer: NodeId) {
290 if !self.preferred_peers.contains(&peer) {
291 self.preferred_peers.push(peer);
292 }
293 }
294
295 pub fn update_latency(&mut self, peer: NodeId, latency_ms: u32) {
297 self.peer_latencies.insert(peer, latency_ms);
298 }
299
300 pub fn average_latency(&self) -> Option<f64> {
302 if self.peer_latencies.is_empty() {
303 return None;
304 }
305 let sum: u64 = self.peer_latencies.values().map(|&v| v as u64).sum();
306 Some(sum as f64 / self.peer_latencies.len() as f64)
307 }
308
309 pub fn connectivity_score(&self) -> f64 {
311 let mut score = 0.0;
312
313 score += (self.tier.priority() as f64) * 0.06;
315
316 score += (4 - self.nat_type.difficulty()) as f64 * 0.05;
318
319 if !self.public_addresses.is_empty() {
321 score += 0.1;
322 }
323
324 if let Some(uplink) = self.uplink_gbps {
326 score += (uplink.min(1000) as f64 / 1000.0) * 0.2;
327 }
328
329 if self.can_relay {
331 score += 0.1;
332 }
333
334 score.min(1.0)
335 }
336}
337
338#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
340pub enum NodeStatus {
341 Online,
343 Degraded,
345 Draining,
347 Maintenance,
349 Offline,
351 Starting,
353 ShuttingDown,
355}
356
357impl NodeStatus {
358 pub fn accepts_work(&self) -> bool {
360 matches!(self, NodeStatus::Online | NodeStatus::Degraded)
361 }
362
363 pub fn is_reachable(&self) -> bool {
365 !matches!(self, NodeStatus::Offline)
366 }
367
368 pub fn routing_priority(&self) -> u8 {
370 match self {
371 NodeStatus::Online => 5,
372 NodeStatus::Degraded => 3,
373 NodeStatus::Draining => 1,
374 NodeStatus::ShuttingDown => 0,
375 NodeStatus::Starting => 2,
376 NodeStatus::Maintenance => 0,
377 NodeStatus::Offline => 0,
378 }
379 }
380}
381
382pub const MAX_METADATA_STRING_LEN: usize = 1024;
388
389pub const MAX_METADATA_TAGS: usize = 256;
395
396pub const MAX_METADATA_CUSTOM_ENTRIES: usize = 256;
399
400pub const MAX_PREFERRED_PEERS: usize = 4096;
405
406pub const MAX_HOP_DISTANCES: usize = 4096;
408
409pub const MAX_PUBLIC_ADDRESSES: usize = 256;
412
413#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
415pub struct NodeMetadata {
416 pub node_id: NodeId,
418 pub name: Option<String>,
420 pub description: Option<String>,
422 pub owner: Option<String>,
424 pub location: Option<LocationInfo>,
426 pub topology: TopologyHints,
428 pub status: NodeStatus,
430 pub custom: HashMap<String, String>,
432 pub version: u64,
434 pub updated_at: u64,
436 pub created_at: u64,
438 pub tags: HashSet<String>,
440 pub roles: HashSet<String>,
442}
443
444impl NodeMetadata {
445 pub fn validate_bounds(&self) -> Result<(), MetadataError> {
458 if let Some(name) = &self.name {
459 if name.len() > MAX_METADATA_STRING_LEN {
460 return Err(MetadataError::Invalid(format!(
461 "name exceeds {} bytes",
462 MAX_METADATA_STRING_LEN
463 )));
464 }
465 }
466 if let Some(d) = &self.description {
467 if d.len() > MAX_METADATA_STRING_LEN {
468 return Err(MetadataError::Invalid(format!(
469 "description exceeds {} bytes",
470 MAX_METADATA_STRING_LEN
471 )));
472 }
473 }
474 if let Some(o) = &self.owner {
475 if o.len() > MAX_METADATA_STRING_LEN {
476 return Err(MetadataError::Invalid(format!(
477 "owner exceeds {} bytes",
478 MAX_METADATA_STRING_LEN
479 )));
480 }
481 }
482 if self.tags.len() > MAX_METADATA_TAGS {
483 return Err(MetadataError::Invalid(format!(
484 "tags exceed {} entries",
485 MAX_METADATA_TAGS
486 )));
487 }
488 for tag in &self.tags {
489 if tag.len() > MAX_METADATA_STRING_LEN {
490 return Err(MetadataError::Invalid(format!(
491 "tag exceeds {} bytes",
492 MAX_METADATA_STRING_LEN
493 )));
494 }
495 }
496 if self.roles.len() > MAX_METADATA_TAGS {
497 return Err(MetadataError::Invalid(format!(
498 "roles exceed {} entries",
499 MAX_METADATA_TAGS
500 )));
501 }
502 for role in &self.roles {
503 if role.len() > MAX_METADATA_STRING_LEN {
504 return Err(MetadataError::Invalid(format!(
505 "role exceeds {} bytes",
506 MAX_METADATA_STRING_LEN
507 )));
508 }
509 }
510 if self.custom.len() > MAX_METADATA_CUSTOM_ENTRIES {
511 return Err(MetadataError::Invalid(format!(
512 "custom map exceeds {} entries",
513 MAX_METADATA_CUSTOM_ENTRIES
514 )));
515 }
516 for (k, v) in &self.custom {
517 if k.len() > MAX_METADATA_STRING_LEN {
518 return Err(MetadataError::Invalid(format!(
519 "custom key exceeds {} bytes",
520 MAX_METADATA_STRING_LEN
521 )));
522 }
523 if v.len() > MAX_METADATA_STRING_LEN {
524 return Err(MetadataError::Invalid(format!(
525 "custom value exceeds {} bytes",
526 MAX_METADATA_STRING_LEN
527 )));
528 }
529 }
530 if self.topology.preferred_peers.len() > MAX_PREFERRED_PEERS {
531 return Err(MetadataError::Invalid(format!(
532 "preferred_peers exceed {} entries",
533 MAX_PREFERRED_PEERS
534 )));
535 }
536 if self.topology.hop_distances.len() > MAX_HOP_DISTANCES {
537 return Err(MetadataError::Invalid(format!(
538 "hop_distances exceed {} entries",
539 MAX_HOP_DISTANCES
540 )));
541 }
542 for k in self.topology.hop_distances.keys() {
547 if k.len() > MAX_METADATA_STRING_LEN {
548 return Err(MetadataError::Invalid(format!(
549 "hop_distances key exceeds {} bytes",
550 MAX_METADATA_STRING_LEN
551 )));
552 }
553 }
554 if self.topology.public_addresses.len() > MAX_PUBLIC_ADDRESSES {
555 return Err(MetadataError::Invalid(format!(
556 "public_addresses exceed {} entries",
557 MAX_PUBLIC_ADDRESSES
558 )));
559 }
560 if let Some(loc) = &self.location {
566 for (label, field) in [
567 ("location.zone", &loc.zone),
568 ("location.provider", &loc.provider),
569 ("location.datacenter", &loc.datacenter),
570 ("location.country_code", &loc.country_code),
571 ("location.city", &loc.city),
572 ] {
573 if let Some(v) = field {
574 if v.len() > MAX_METADATA_STRING_LEN {
575 return Err(MetadataError::Invalid(format!(
576 "{label} exceeds {MAX_METADATA_STRING_LEN} bytes",
577 )));
578 }
579 }
580 }
581 }
582 Ok(())
583 }
584
585 pub fn new(node_id: NodeId) -> Self {
587 let now = SystemTime::now()
588 .duration_since(UNIX_EPOCH)
589 .unwrap_or_default()
590 .as_millis() as u64;
591
592 Self {
593 node_id,
594 name: None,
595 description: None,
596 owner: None,
597 location: None,
598 topology: TopologyHints::default(),
599 status: NodeStatus::Starting,
600 custom: HashMap::new(),
601 version: 1,
602 updated_at: now,
603 created_at: now,
604 tags: HashSet::new(),
605 roles: HashSet::new(),
606 }
607 }
608
609 pub fn with_name(mut self, name: impl Into<String>) -> Self {
611 self.name = Some(name.into());
612 self
613 }
614
615 pub fn with_owner(mut self, owner: impl Into<String>) -> Self {
617 self.owner = Some(owner.into());
618 self
619 }
620
621 pub fn with_location(mut self, location: LocationInfo) -> Self {
623 self.location = Some(location);
624 self
625 }
626
627 pub fn with_topology(mut self, topology: TopologyHints) -> Self {
629 self.topology = topology;
630 self
631 }
632
633 pub fn with_status(mut self, status: NodeStatus) -> Self {
635 self.status = status;
636 self
637 }
638
639 pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
641 self.tags.insert(tag.into());
642 self
643 }
644
645 pub fn with_role(mut self, role: impl Into<String>) -> Self {
647 self.roles.insert(role.into());
648 self
649 }
650
651 pub fn with_custom(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
653 self.custom.insert(key.into(), value.into());
654 self
655 }
656
657 pub fn touch(&mut self) {
659 self.version += 1;
660 self.updated_at = SystemTime::now()
661 .duration_since(UNIX_EPOCH)
662 .unwrap_or_default()
663 .as_millis() as u64;
664 }
665
666 pub fn age(&self) -> Duration {
668 let now = SystemTime::now()
669 .duration_since(UNIX_EPOCH)
670 .unwrap_or_default()
671 .as_millis() as u64;
672 Duration::from_millis(now.saturating_sub(self.updated_at))
673 }
674
675 pub fn is_stale(&self, max_age: Duration) -> bool {
677 self.age() > max_age
678 }
679
680 pub fn routing_score(&self) -> f64 {
682 let mut score = 0.0;
683
684 score += (self.status.routing_priority() as f64) * 0.1;
686
687 score += self.topology.connectivity_score() * 0.3;
689
690 score += (self.topology.tier.priority() as f64) * 0.04;
692
693 score.min(1.0)
694 }
695}
696
697#[derive(Debug, Clone, Default)]
699pub struct MetadataQuery {
700 pub status: Option<NodeStatus>,
702 pub statuses: Option<Vec<NodeStatus>>,
704 pub continent: Option<String>,
706 pub region: Option<Region>,
708 pub min_tier: Option<NetworkTier>,
710 pub tags: Option<Vec<String>>,
712 pub roles: Option<Vec<String>>,
714 pub owner: Option<String>,
716 pub max_age: Option<Duration>,
718 pub accepts_work: Option<bool>,
720 pub can_relay: Option<bool>,
722 pub limit: Option<usize>,
724}
725
726impl MetadataQuery {
727 pub fn new() -> Self {
729 Self::default()
730 }
731
732 pub fn with_status(mut self, status: NodeStatus) -> Self {
734 self.status = Some(status);
735 self
736 }
737
738 pub fn with_statuses(mut self, statuses: Vec<NodeStatus>) -> Self {
740 self.statuses = Some(statuses);
741 self
742 }
743
744 pub fn with_continent(mut self, continent: impl Into<String>) -> Self {
746 self.continent = Some(continent.into());
747 self
748 }
749
750 pub fn with_region(mut self, region: Region) -> Self {
752 self.region = Some(region);
753 self
754 }
755
756 pub fn with_min_tier(mut self, tier: NetworkTier) -> Self {
758 self.min_tier = Some(tier);
759 self
760 }
761
762 pub fn with_tags(mut self, tags: Vec<String>) -> Self {
764 self.tags = Some(tags);
765 self
766 }
767
768 pub fn with_roles(mut self, roles: Vec<String>) -> Self {
770 self.roles = Some(roles);
771 self
772 }
773
774 pub fn with_owner(mut self, owner: impl Into<String>) -> Self {
776 self.owner = Some(owner.into());
777 self
778 }
779
780 pub fn with_max_age(mut self, max_age: Duration) -> Self {
782 self.max_age = Some(max_age);
783 self
784 }
785
786 pub fn accepting_work(mut self) -> Self {
788 self.accepts_work = Some(true);
789 self
790 }
791
792 pub fn can_relay(mut self) -> Self {
794 self.can_relay = Some(true);
795 self
796 }
797
798 pub fn with_limit(mut self, limit: usize) -> Self {
800 self.limit = Some(limit);
801 self
802 }
803
804 pub fn matches(&self, meta: &NodeMetadata) -> bool {
806 if let Some(status) = self.status {
808 if meta.status != status {
809 return false;
810 }
811 }
812
813 if let Some(ref statuses) = self.statuses {
815 if !statuses.contains(&meta.status) {
816 return false;
817 }
818 }
819
820 if let Some(ref continent) = self.continent {
822 if let Some(ref loc) = meta.location {
823 if loc.region.continent() != continent {
824 return false;
825 }
826 } else {
827 return false;
828 }
829 }
830
831 if let Some(ref region) = self.region {
833 if let Some(ref loc) = meta.location {
834 if &loc.region != region {
835 return false;
836 }
837 } else {
838 return false;
839 }
840 }
841
842 if let Some(min_tier) = self.min_tier {
844 if meta.topology.tier < min_tier {
845 return false;
846 }
847 }
848
849 if let Some(ref tags) = self.tags {
851 for tag in tags {
852 if !meta.tags.contains(tag) {
853 return false;
854 }
855 }
856 }
857
858 if let Some(ref roles) = self.roles {
860 for role in roles {
861 if !meta.roles.contains(role) {
862 return false;
863 }
864 }
865 }
866
867 if let Some(ref owner) = self.owner {
869 if meta.owner.as_ref() != Some(owner) {
870 return false;
871 }
872 }
873
874 if let Some(max_age) = self.max_age {
876 if meta.is_stale(max_age) {
877 return false;
878 }
879 }
880
881 if let Some(accepts) = self.accepts_work {
883 if meta.status.accepts_work() != accepts {
884 return false;
885 }
886 }
887
888 if let Some(can_relay) = self.can_relay {
890 if meta.topology.can_relay != can_relay {
891 return false;
892 }
893 }
894
895 true
896 }
897}
898
899#[derive(Debug, Clone, PartialEq, Eq)]
901pub enum MetadataError {
902 NotFound(NodeId),
904 VersionConflict {
906 expected: u64,
908 actual: u64,
910 },
911 Invalid(String),
913 CapacityExceeded,
915}
916
917impl std::fmt::Display for MetadataError {
918 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919 match self {
920 MetadataError::NotFound(_) => write!(f, "Node not found"),
921 MetadataError::VersionConflict { expected, actual } => {
922 write!(f, "Version conflict: expected {}, got {}", expected, actual)
923 }
924 MetadataError::Invalid(msg) => write!(f, "Invalid metadata: {}", msg),
925 MetadataError::CapacityExceeded => write!(f, "Store capacity exceeded"),
926 }
927 }
928}
929
930impl std::error::Error for MetadataError {}
931
932#[derive(Debug, Clone, Default)]
934pub struct MetadataStoreStats {
935 pub total_nodes: usize,
937 pub by_status: HashMap<NodeStatus, usize>,
939 pub by_tier: HashMap<NetworkTier, usize>,
941 pub by_continent: HashMap<String, usize>,
943 pub queries: u64,
945 pub updates: u64,
947}
948
949pub struct MetadataStore {
951 nodes: DashMap<NodeId, Arc<NodeMetadata>>,
953 by_status: DashMap<NodeStatus, HashSet<NodeId>>,
955 by_tier: DashMap<NetworkTier, HashSet<NodeId>>,
957 by_continent: DashMap<String, HashSet<NodeId>>,
959 by_tag: DashMap<String, HashSet<NodeId>>,
961 by_role: DashMap<String, HashSet<NodeId>>,
963 by_owner: DashMap<String, HashSet<NodeId>>,
965 query_count: AtomicU64,
967 update_count: AtomicU64,
969 node_count: AtomicUsize,
974 max_capacity: Option<usize>,
976}
977
978impl MetadataStore {
979 pub fn new() -> Self {
981 Self {
982 nodes: DashMap::new(),
983 by_status: DashMap::new(),
984 by_tier: DashMap::new(),
985 by_continent: DashMap::new(),
986 by_tag: DashMap::new(),
987 by_role: DashMap::new(),
988 by_owner: DashMap::new(),
989 query_count: AtomicU64::new(0),
990 update_count: AtomicU64::new(0),
991 node_count: AtomicUsize::new(0),
992 max_capacity: None,
993 }
994 }
995
996 pub fn with_capacity(max_capacity: usize) -> Self {
998 let mut store = Self::new();
999 store.max_capacity = Some(max_capacity);
1000 store
1001 }
1002
1003 pub fn upsert(&self, metadata: NodeMetadata) -> Result<(), MetadataError> {
1021 metadata.validate_bounds()?;
1028
1029 let node_id = metadata.node_id;
1030
1031 if let Some(max) = self.max_capacity {
1040 if !self.nodes.contains_key(&node_id) && self.node_count.load(Ordering::Relaxed) >= max
1041 {
1042 return Err(MetadataError::CapacityExceeded);
1043 }
1044 }
1045
1046 use dashmap::mapref::entry::Entry;
1067 match self.nodes.entry(node_id) {
1068 Entry::Vacant(slot) => {
1069 self.add_to_indexes(&metadata);
1070 slot.insert(Arc::new(metadata));
1071 self.node_count.fetch_add(1, Ordering::Relaxed);
1073 }
1074 Entry::Occupied(mut slot) => {
1075 let old = slot.get().clone();
1080 self.remove_from_indexes(&old);
1081 self.add_to_indexes(&metadata);
1082 slot.insert(Arc::new(metadata));
1083 }
1084 }
1085 self.update_count.fetch_add(1, Ordering::Relaxed);
1086
1087 Ok(())
1088 }
1089
1090 pub fn update_versioned(
1092 &self,
1093 metadata: NodeMetadata,
1094 expected_version: u64,
1095 ) -> Result<(), MetadataError> {
1096 let node_id = metadata.node_id;
1097
1098 if let Some(existing) = self.nodes.get(&node_id) {
1100 if existing.version != expected_version {
1101 return Err(MetadataError::VersionConflict {
1102 expected: expected_version,
1103 actual: existing.version,
1104 });
1105 }
1106 }
1107
1108 self.upsert(metadata)
1109 }
1110
1111 pub fn get(&self, node_id: &NodeId) -> Option<Arc<NodeMetadata>> {
1113 self.nodes.get(node_id).map(|r| Arc::clone(&r))
1114 }
1115
1116 pub fn remove(&self, node_id: &NodeId) -> Option<Arc<NodeMetadata>> {
1118 if let Some((_, meta)) = self.nodes.remove(node_id) {
1119 self.remove_from_indexes(&meta);
1120 self.node_count.fetch_sub(1, Ordering::Relaxed);
1121 Some(meta)
1122 } else {
1123 None
1124 }
1125 }
1126
1127 pub fn query(&self, query: &MetadataQuery) -> Vec<Arc<NodeMetadata>> {
1129 self.query_count.fetch_add(1, Ordering::Relaxed);
1130
1131 let candidates: Vec<NodeId> = if let Some(status) = query.status {
1133 self.by_status
1134 .get(&status)
1135 .map(|s| s.iter().copied().collect())
1136 .unwrap_or_default()
1137 } else if let Some(ref continent) = query.continent {
1138 self.by_continent
1139 .get(continent)
1140 .map(|s| s.iter().copied().collect())
1141 .unwrap_or_default()
1142 } else if let Some(min_tier) = query.min_tier {
1143 let mut nodes = HashSet::new();
1145 for tier in [
1146 NetworkTier::Edge,
1147 NetworkTier::Consumer,
1148 NetworkTier::Business,
1149 NetworkTier::Datacenter,
1150 NetworkTier::Premium,
1151 NetworkTier::Core,
1152 ] {
1153 if tier >= min_tier {
1154 if let Some(tier_nodes) = self.by_tier.get(&tier) {
1155 nodes.extend(tier_nodes.iter().copied());
1156 }
1157 }
1158 }
1159 nodes.into_iter().collect()
1160 } else {
1161 self.nodes.iter().map(|r| *r.key()).collect()
1163 };
1164
1165 let mut results: Vec<Arc<NodeMetadata>> = candidates
1167 .into_iter()
1168 .filter_map(|id| self.nodes.get(&id).map(|r| Arc::clone(&r)))
1169 .filter(|meta| query.matches(meta))
1170 .collect();
1171
1172 if let Some(limit) = query.limit {
1174 results.truncate(limit);
1175 }
1176
1177 results
1178 }
1179
1180 pub fn find_nearby(
1182 &self,
1183 location: &LocationInfo,
1184 max_distance_km: f64,
1185 limit: usize,
1186 ) -> Vec<(Arc<NodeMetadata>, f64)> {
1187 self.query_count.fetch_add(1, Ordering::Relaxed);
1188
1189 let mut results: Vec<(Arc<NodeMetadata>, f64)> = self
1190 .nodes
1191 .iter()
1192 .filter_map(|r| {
1193 let meta = Arc::clone(r.value());
1194 meta.location
1195 .as_ref()
1196 .and_then(|loc| location.distance_to(loc))
1197 .filter(|&d| d <= max_distance_km)
1198 .map(|d| (meta, d))
1199 })
1200 .collect();
1201
1202 results.sort_by(|a, b| {
1211 let a_dist = if a.1.is_nan() { f64::INFINITY } else { a.1 };
1212 let b_dist = if b.1.is_nan() { f64::INFINITY } else { b.1 };
1213 a_dist.total_cmp(&b_dist)
1214 });
1215 results.truncate(limit);
1216
1217 results
1218 }
1219
1220 pub fn find_best_for_routing(&self, limit: usize) -> Vec<Arc<NodeMetadata>> {
1222 self.query_count.fetch_add(1, Ordering::Relaxed);
1223
1224 let mut results: Vec<(Arc<NodeMetadata>, f64)> = self
1225 .nodes
1226 .iter()
1227 .filter(|r| r.value().status.accepts_work())
1228 .map(|r| {
1229 let meta = Arc::clone(r.value());
1230 let score = meta.routing_score();
1231 (meta, score)
1232 })
1233 .collect();
1234
1235 results.sort_by(|a, b| {
1239 let a_score = if a.1.is_nan() { f64::NEG_INFINITY } else { a.1 };
1240 let b_score = if b.1.is_nan() { f64::NEG_INFINITY } else { b.1 };
1241 b_score.total_cmp(&a_score)
1242 });
1243 results.truncate(limit);
1244
1245 results.into_iter().map(|(m, _)| m).collect()
1246 }
1247
1248 pub fn find_relays(&self) -> Vec<Arc<NodeMetadata>> {
1250 self.query(&MetadataQuery::new().can_relay().accepting_work())
1251 }
1252
1253 pub fn stats(&self) -> MetadataStoreStats {
1255 let by_status: HashMap<NodeStatus, usize> = self
1263 .by_status
1264 .iter()
1265 .filter(|e| !e.value().is_empty())
1266 .map(|e| (*e.key(), e.value().len()))
1267 .collect();
1268 let by_tier: HashMap<NetworkTier, usize> = self
1269 .by_tier
1270 .iter()
1271 .filter(|e| !e.value().is_empty())
1272 .map(|e| (*e.key(), e.value().len()))
1273 .collect();
1274 let by_continent: HashMap<String, usize> = self
1275 .by_continent
1276 .iter()
1277 .filter(|e| !e.value().is_empty())
1278 .map(|e| (e.key().clone(), e.value().len()))
1279 .collect();
1280
1281 MetadataStoreStats {
1282 total_nodes: self.node_count.load(Ordering::Relaxed),
1283 by_status,
1284 by_tier,
1285 by_continent,
1286 queries: self.query_count.load(Ordering::Relaxed),
1287 updates: self.update_count.load(Ordering::Relaxed),
1288 }
1289 }
1290
1291 pub fn len(&self) -> usize {
1293 self.node_count.load(Ordering::Relaxed)
1294 }
1295
1296 pub fn is_empty(&self) -> bool {
1298 self.node_count.load(Ordering::Relaxed) == 0
1299 }
1300
1301 pub fn clear(&self) {
1326 let keys: Vec<NodeId> = self.nodes.iter().map(|r| *r.key()).collect();
1333 for key in keys {
1334 if let Some((_, meta)) = self.nodes.remove(&key) {
1335 self.remove_from_indexes(&meta);
1336 self.node_count.fetch_sub(1, Ordering::Relaxed);
1340 }
1341 }
1342 self.by_status.clear();
1347 self.by_tier.clear();
1348 self.by_continent.clear();
1349 self.by_tag.clear();
1350 self.by_role.clear();
1351 self.by_owner.clear();
1352 }
1353
1354 fn add_to_indexes(&self, meta: &NodeMetadata) {
1356 let node_id = meta.node_id;
1357
1358 self.by_status
1360 .entry(meta.status)
1361 .or_default()
1362 .insert(node_id);
1363
1364 self.by_tier
1366 .entry(meta.topology.tier)
1367 .or_default()
1368 .insert(node_id);
1369
1370 if let Some(ref loc) = meta.location {
1372 self.by_continent
1373 .entry(loc.region.continent().to_string())
1374 .or_default()
1375 .insert(node_id);
1376 }
1377
1378 for tag in &meta.tags {
1380 self.by_tag.entry(tag.clone()).or_default().insert(node_id);
1381 }
1382
1383 for role in &meta.roles {
1385 self.by_role
1386 .entry(role.clone())
1387 .or_default()
1388 .insert(node_id);
1389 }
1390
1391 if let Some(ref owner) = meta.owner {
1393 self.by_owner
1394 .entry(owner.clone())
1395 .or_default()
1396 .insert(node_id);
1397 }
1398 }
1399
1400 fn remove_from_indexes(&self, meta: &NodeMetadata) {
1402 let node_id = meta.node_id;
1403
1404 if let Some(mut set) = self.by_status.get_mut(&meta.status) {
1406 set.remove(&node_id);
1407 }
1408
1409 if let Some(mut set) = self.by_tier.get_mut(&meta.topology.tier) {
1411 set.remove(&node_id);
1412 }
1413
1414 if let Some(ref loc) = meta.location {
1416 if let Some(mut set) = self.by_continent.get_mut(loc.region.continent()) {
1417 set.remove(&node_id);
1418 }
1419 }
1420
1421 for tag in &meta.tags {
1423 if let Some(mut set) = self.by_tag.get_mut(tag) {
1424 set.remove(&node_id);
1425 }
1426 }
1427
1428 for role in &meta.roles {
1430 if let Some(mut set) = self.by_role.get_mut(role) {
1431 set.remove(&node_id);
1432 }
1433 }
1434
1435 if let Some(ref owner) = meta.owner {
1437 if let Some(mut set) = self.by_owner.get_mut(owner) {
1438 set.remove(&node_id);
1439 }
1440 }
1441 }
1442}
1443
1444impl Default for MetadataStore {
1445 fn default() -> Self {
1446 Self::new()
1447 }
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452 use super::*;
1453
1454 fn make_node_id(n: u8) -> NodeId {
1455 let mut id = [0u8; 32];
1456 id[0] = n;
1457 id
1458 }
1459
1460 #[test]
1461 fn test_location_distance() {
1462 let ny = LocationInfo::new(Region::NorthAmerica("us-east".into()))
1464 .with_coordinates(40.7128, -74.0060);
1465
1466 let la = LocationInfo::new(Region::NorthAmerica("us-west".into()))
1468 .with_coordinates(34.0522, -118.2437);
1469
1470 let london =
1472 LocationInfo::new(Region::Europe("uk".into())).with_coordinates(51.5074, -0.1278);
1473
1474 let ny_la = ny.distance_to(&la).unwrap();
1475 assert!(ny_la > 3900.0 && ny_la < 4000.0, "NY-LA: {}", ny_la);
1476
1477 let ny_london = ny.distance_to(&london).unwrap();
1478 assert!(
1479 ny_london > 5500.0 && ny_london < 5600.0,
1480 "NY-London: {}",
1481 ny_london
1482 );
1483
1484 assert!(ny.same_continent(&la));
1485 assert!(!ny.same_continent(&london));
1486 }
1487
1488 #[test]
1489 fn test_nat_connectivity() {
1490 assert!(NatType::None.can_connect_direct(&NatType::Symmetric));
1491 assert!(NatType::FullCone.can_connect_direct(&NatType::RestrictedCone));
1492 assert!(!NatType::Symmetric.can_connect_direct(&NatType::Symmetric));
1493 }
1494
1495 #[test]
1496 fn test_topology_score() {
1497 let basic = TopologyHints::new(NetworkTier::Consumer);
1498 let premium = TopologyHints::new(NetworkTier::Premium)
1499 .with_bandwidth(1000, 1000)
1500 .with_nat(NatType::None)
1501 .with_relay(100);
1502
1503 assert!(premium.connectivity_score() > basic.connectivity_score());
1504 }
1505
1506 #[test]
1507 fn test_node_metadata() {
1508 let node = NodeMetadata::new(make_node_id(1))
1509 .with_name("test-node")
1510 .with_owner("test-org")
1511 .with_tag("gpu")
1512 .with_role("worker")
1513 .with_status(NodeStatus::Online);
1514
1515 assert_eq!(node.name, Some("test-node".into()));
1516 assert!(node.tags.contains("gpu"));
1517 assert!(node.roles.contains("worker"));
1518 assert!(node.status.accepts_work());
1519 }
1520
1521 #[test]
1522 fn test_metadata_store_basic() {
1523 let store = MetadataStore::new();
1524
1525 let node1 = NodeMetadata::new(make_node_id(1))
1526 .with_name("node1")
1527 .with_status(NodeStatus::Online);
1528
1529 let node2 = NodeMetadata::new(make_node_id(2))
1530 .with_name("node2")
1531 .with_status(NodeStatus::Degraded);
1532
1533 store.upsert(node1).unwrap();
1534 store.upsert(node2).unwrap();
1535
1536 assert_eq!(store.len(), 2);
1537
1538 let retrieved = store.get(&make_node_id(1)).unwrap();
1539 assert_eq!(retrieved.name, Some("node1".into()));
1540
1541 store.remove(&make_node_id(1));
1542 assert_eq!(store.len(), 1);
1543 assert!(store.get(&make_node_id(1)).is_none());
1544 }
1545
1546 #[test]
1547 fn test_metadata_query() {
1548 let store = MetadataStore::new();
1549
1550 for i in 0..10 {
1552 let status = if i < 5 {
1553 NodeStatus::Online
1554 } else {
1555 NodeStatus::Degraded
1556 };
1557 let tier = if i < 3 {
1558 NetworkTier::Core
1559 } else {
1560 NetworkTier::Consumer
1561 };
1562
1563 let mut node = NodeMetadata::new(make_node_id(i))
1564 .with_status(status)
1565 .with_topology(TopologyHints::new(tier));
1566
1567 if i % 2 == 0 {
1568 node.tags.insert("even".into());
1569 }
1570
1571 store.upsert(node).unwrap();
1572 }
1573
1574 let online = store.query(&MetadataQuery::new().with_status(NodeStatus::Online));
1576 assert_eq!(online.len(), 5);
1577
1578 let core = store.query(&MetadataQuery::new().with_min_tier(NetworkTier::Core));
1580 assert_eq!(core.len(), 3);
1581
1582 let working = store.query(&MetadataQuery::new().accepting_work());
1584 assert_eq!(working.len(), 10); let limited = store.query(&MetadataQuery::new().with_limit(3));
1588 assert_eq!(limited.len(), 3);
1589 }
1590
1591 #[test]
1592 fn test_find_nearby() {
1593 let store = MetadataStore::new();
1594
1595 let locations = [
1597 (40.7128, -74.0060), (34.0522, -118.2437), (51.5074, -0.1278), ];
1601
1602 for (i, (lat, lon)) in locations.iter().enumerate() {
1603 let node = NodeMetadata::new(make_node_id(i as u8))
1604 .with_location(
1605 LocationInfo::new(Region::NorthAmerica("test".into()))
1606 .with_coordinates(*lat, *lon),
1607 )
1608 .with_status(NodeStatus::Online);
1609 store.upsert(node).unwrap();
1610 }
1611
1612 let ny = LocationInfo::new(Region::NorthAmerica("test".into()))
1614 .with_coordinates(40.7128, -74.0060);
1615
1616 let nearby = store.find_nearby(&ny, 100.0, 10);
1617 assert_eq!(nearby.len(), 1); let nearby = store.find_nearby(&ny, 5000.0, 10);
1620 assert_eq!(nearby.len(), 2); let nearby = store.find_nearby(&ny, 10000.0, 10);
1623 assert_eq!(nearby.len(), 3); }
1625
1626 #[test]
1627 fn test_find_relays() {
1628 let store = MetadataStore::new();
1629
1630 let relay_node = NodeMetadata::new(make_node_id(1))
1631 .with_topology(TopologyHints::new(NetworkTier::Datacenter).with_relay(100))
1632 .with_status(NodeStatus::Online);
1633
1634 let normal_node = NodeMetadata::new(make_node_id(2))
1635 .with_topology(TopologyHints::new(NetworkTier::Consumer))
1636 .with_status(NodeStatus::Online);
1637
1638 store.upsert(relay_node).unwrap();
1639 store.upsert(normal_node).unwrap();
1640
1641 let relays = store.find_relays();
1642 assert_eq!(relays.len(), 1);
1643 }
1644
1645 #[test]
1646 fn test_version_conflict() {
1647 let store = MetadataStore::new();
1648
1649 let node = NodeMetadata::new(make_node_id(1));
1650 store.upsert(node.clone()).unwrap();
1651
1652 let result = store.update_versioned(node.clone(), 999);
1654 assert!(matches!(result, Err(MetadataError::VersionConflict { .. })));
1655
1656 let result = store.update_versioned(node, 1);
1658 assert!(result.is_ok());
1659 }
1660
1661 #[test]
1662 fn test_capacity_limit() {
1663 let store = MetadataStore::with_capacity(2);
1664
1665 store.upsert(NodeMetadata::new(make_node_id(1))).unwrap();
1666 store.upsert(NodeMetadata::new(make_node_id(2))).unwrap();
1667
1668 let result = store.upsert(NodeMetadata::new(make_node_id(3)));
1669 assert!(matches!(result, Err(MetadataError::CapacityExceeded)));
1670
1671 store.upsert(NodeMetadata::new(make_node_id(1))).unwrap();
1673 }
1674
1675 #[test]
1687 fn cr30_read_path_invariant_every_returned_node_passes_validate_bounds() {
1688 let store = MetadataStore::new();
1689 let mut node = NodeMetadata::new(make_node_id(1));
1690 node.tags.insert("training".into());
1691 store.upsert(node).unwrap();
1692
1693 let got = store.get(&make_node_id(1)).expect("inserted node");
1695 got.validate_bounds().expect(
1696 "CR-30: every node returned from MetadataStore::get MUST satisfy \
1697 validate_bounds. If this fires, a write path is bypassing \
1698 upsert's bound check.",
1699 );
1700
1701 let q = MetadataQuery::new();
1703 for entry in store.query(&q) {
1704 entry.validate_bounds().expect(
1705 "CR-30: every node returned from MetadataStore::query MUST \
1706 satisfy validate_bounds.",
1707 );
1708 }
1709 }
1710
1711 #[test]
1725 fn cr18_capacity_check_is_a_soft_cap_under_concurrent_upserts() {
1726 use std::sync::Arc;
1727 use std::thread;
1728
1729 const MAX: usize = 4;
1730 const N_THREADS: usize = 16;
1731
1732 let store = Arc::new(MetadataStore::with_capacity(MAX));
1733 let barrier = Arc::new(std::sync::Barrier::new(N_THREADS));
1734
1735 let mut handles = Vec::with_capacity(N_THREADS);
1736 for t in 0..N_THREADS {
1737 let store = Arc::clone(&store);
1738 let barrier = Arc::clone(&barrier);
1739 handles.push(thread::spawn(move || {
1740 barrier.wait();
1741 let id = make_node_id(t as u8 + 1);
1742 store.upsert(NodeMetadata::new(id))
1743 }));
1744 }
1745 let mut accepted = 0usize;
1746 let mut rejected = 0usize;
1747 for h in handles {
1748 match h.join().unwrap() {
1749 Ok(()) => accepted += 1,
1750 Err(MetadataError::CapacityExceeded) => rejected += 1,
1751 Err(other) => panic!("unexpected upsert error: {other:?}"),
1752 }
1753 }
1754
1755 assert!(
1763 accepted >= MAX,
1764 "at least the cap's worth must succeed; got {accepted}"
1765 );
1766 assert!(
1767 accepted + rejected == N_THREADS,
1768 "every upsert must surface either Ok or CapacityExceeded"
1769 );
1770 assert!(
1774 store.nodes.len() <= accepted,
1775 "store size must not exceed the count of successful upserts"
1776 );
1777 }
1780
1781 #[test]
1790 fn upsert_rejects_oversized_tags() {
1791 let store = MetadataStore::new();
1792 let mut node = NodeMetadata::new(make_node_id(1));
1793 for i in 0..(MAX_METADATA_TAGS + 1) {
1794 node.tags.insert(format!("t{}", i));
1795 }
1796 let result = store.upsert(node);
1797 assert!(
1798 matches!(result, Err(MetadataError::Invalid(_))),
1799 "oversized tags must surface as MetadataError::Invalid, got {:?}",
1800 result,
1801 );
1802 }
1803
1804 #[test]
1806 fn upsert_rejects_oversized_custom_map() {
1807 let store = MetadataStore::new();
1808 let mut node = NodeMetadata::new(make_node_id(2));
1809 for i in 0..(MAX_METADATA_CUSTOM_ENTRIES + 1) {
1810 node.custom.insert(format!("k{}", i), "v".to_string());
1811 }
1812 assert!(matches!(store.upsert(node), Err(MetadataError::Invalid(_))));
1813 }
1814
1815 #[test]
1817 fn upsert_rejects_oversized_string_fields() {
1818 let store = MetadataStore::new();
1819 let huge = "x".repeat(MAX_METADATA_STRING_LEN + 1);
1820 let node = NodeMetadata::new(make_node_id(3)).with_name(huge);
1821 assert!(matches!(store.upsert(node), Err(MetadataError::Invalid(_))));
1822 }
1823
1824 #[test]
1828 fn upsert_accepts_metadata_at_exact_boundaries() {
1829 let store = MetadataStore::new();
1830 let mut node = NodeMetadata::new(make_node_id(4));
1831 for i in 0..MAX_METADATA_TAGS {
1832 node.tags.insert(format!("t{}", i));
1833 }
1834 let name_at_cap = "x".repeat(MAX_METADATA_STRING_LEN);
1835 node = node.with_name(name_at_cap);
1836 store
1837 .upsert(node)
1838 .expect("metadata at the exact boundaries must be accepted");
1839 }
1840
1841 #[test]
1855 fn upsert_serializes_concurrent_writes_on_same_node_id() {
1856 use std::sync::Arc;
1857 use std::thread;
1858
1859 let store = Arc::new(MetadataStore::new());
1860 let node_id = make_node_id(42);
1861
1862 let seed = NodeMetadata::new(node_id)
1865 .with_status(NodeStatus::Online)
1866 .with_tag("seed")
1867 .with_topology(TopologyHints::new(NetworkTier::Consumer));
1868 store.upsert(seed).unwrap();
1869
1870 let n_iters = 50;
1874 let store_a = store.clone();
1875 let store_b = store.clone();
1876 let h_a = thread::spawn(move || {
1877 for i in 0..n_iters {
1878 let node = NodeMetadata::new(node_id)
1879 .with_status(NodeStatus::Online)
1880 .with_tag(format!("a-{}", i))
1881 .with_topology(TopologyHints::new(NetworkTier::Premium));
1882 store_a.upsert(node).unwrap();
1883 }
1884 });
1885 let h_b = thread::spawn(move || {
1886 for i in 0..n_iters {
1887 let node = NodeMetadata::new(node_id)
1888 .with_status(NodeStatus::Degraded)
1889 .with_tag(format!("b-{}", i))
1890 .with_topology(TopologyHints::new(NetworkTier::Datacenter));
1891 store_b.upsert(node).unwrap();
1892 }
1893 });
1894 h_a.join().unwrap();
1895 h_b.join().unwrap();
1896
1897 let final_meta = store.get(&node_id).expect("node must still exist");
1902 let final_status = final_meta.status;
1903 let final_tags: std::collections::HashSet<&str> =
1904 final_meta.tags.iter().map(|s| s.as_str()).collect();
1905
1906 for status in [
1909 NodeStatus::Online,
1910 NodeStatus::Offline,
1911 NodeStatus::Degraded,
1912 NodeStatus::Starting,
1913 NodeStatus::Maintenance,
1914 ] {
1915 let bucket_has_node = store
1916 .by_status
1917 .get(&status)
1918 .map(|s| s.contains(&node_id))
1919 .unwrap_or(false);
1920 if status == final_status {
1921 assert!(
1922 bucket_has_node,
1923 "final status {:?} bucket must contain the node",
1924 status
1925 );
1926 } else {
1927 assert!(
1928 !bucket_has_node,
1929 "stale status {:?} bucket must NOT contain the node",
1930 status
1931 );
1932 }
1933 }
1934
1935 for i in 0..n_iters {
1939 for prefix in ["a-", "b-"] {
1940 let tag = format!("{}{}", prefix, i);
1941 if final_tags.contains(tag.as_str()) {
1942 continue;
1943 }
1944 let bucket_has_node = store
1945 .by_tag
1946 .get(&tag)
1947 .map(|s| s.contains(&node_id))
1948 .unwrap_or(false);
1949 assert!(
1950 !bucket_has_node,
1951 "stale tag '{}' bucket must NOT contain the node",
1952 tag
1953 );
1954 }
1955 }
1956 let seed_bucket_has_node = store
1958 .by_tag
1959 .get("seed")
1960 .map(|s| s.contains(&node_id))
1961 .unwrap_or(false);
1962 assert!(
1963 !seed_bucket_has_node,
1964 "the original seed tag must have been removed"
1965 );
1966 }
1967
1968 #[test]
1969 fn test_stats() {
1970 let store = MetadataStore::new();
1971
1972 for i in 0..5 {
1973 let node = NodeMetadata::new(make_node_id(i))
1974 .with_status(if i < 3 {
1975 NodeStatus::Online
1976 } else {
1977 NodeStatus::Offline
1978 })
1979 .with_topology(TopologyHints::new(NetworkTier::Consumer));
1980 store.upsert(node).unwrap();
1981 }
1982
1983 store.query(&MetadataQuery::new());
1985 store.query(&MetadataQuery::new());
1986
1987 let stats = store.stats();
1988 assert_eq!(stats.total_nodes, 5);
1989 assert_eq!(stats.by_status.get(&NodeStatus::Online), Some(&3));
1990 assert_eq!(stats.by_status.get(&NodeStatus::Offline), Some(&2));
1991 assert_eq!(stats.queries, 2);
1992 assert_eq!(stats.updates, 5);
1993 }
1994
1995 #[test]
2001 fn stats_and_len_track_inserts_removes_and_status_changes() {
2002 let store = MetadataStore::new();
2003 for i in 0..4u8 {
2004 store
2005 .upsert(NodeMetadata::new(make_node_id(i)).with_status(NodeStatus::Online))
2006 .unwrap();
2007 }
2008 assert_eq!(store.len(), 4);
2009 assert_eq!(store.stats().total_nodes, 4);
2010 assert_eq!(store.stats().by_status.get(&NodeStatus::Online), Some(&4));
2011
2012 store
2015 .upsert(NodeMetadata::new(make_node_id(0)).with_status(NodeStatus::Offline))
2016 .unwrap();
2017 assert_eq!(store.len(), 4, "re-upsert must not change node count");
2018 let s = store.stats();
2019 assert_eq!(s.by_status.get(&NodeStatus::Online), Some(&3));
2020 assert_eq!(s.by_status.get(&NodeStatus::Offline), Some(&1));
2021
2022 store.remove(&make_node_id(0));
2025 assert_eq!(store.len(), 3);
2026 let s = store.stats();
2027 assert_eq!(s.total_nodes, 3);
2028 assert_eq!(
2029 s.by_status.get(&NodeStatus::Offline),
2030 None,
2031 "emptied status bucket must not appear in stats"
2032 );
2033
2034 store.clear();
2035 assert_eq!(store.len(), 0);
2036 assert!(store.is_empty());
2037 assert_eq!(store.stats().total_nodes, 0);
2038 assert!(store.stats().by_status.is_empty());
2039 }
2040
2041 #[test]
2053 fn validate_bounds_rejects_oversized_location_string() {
2054 let mut node = NodeMetadata::new(make_node_id(1));
2055 node.location = Some(LocationInfo {
2056 region: Region::NorthAmerica("us-east".into()),
2057 zone: None,
2058 latitude: None,
2059 longitude: None,
2060 asn: None,
2061 provider: Some("p".repeat(MAX_METADATA_STRING_LEN + 1)),
2062 datacenter: None,
2063 country_code: None,
2064 city: None,
2065 });
2066
2067 match node.validate_bounds() {
2068 Err(MetadataError::Invalid(msg)) => {
2069 assert!(
2070 msg.contains("location.provider"),
2071 "rejection must name the offending field; got: {msg}",
2072 );
2073 }
2074 other => panic!("expected Invalid for oversized location.provider, got {other:?}"),
2075 }
2076 }
2077
2078 #[test]
2083 fn validate_bounds_rejects_oversized_hop_distances_key() {
2084 let mut topo = TopologyHints::new(NetworkTier::Consumer);
2085 topo.hop_distances
2086 .insert("k".repeat(MAX_METADATA_STRING_LEN + 1), 3);
2087 let node = NodeMetadata::new(make_node_id(1)).with_topology(topo);
2088
2089 match node.validate_bounds() {
2090 Err(MetadataError::Invalid(msg)) => {
2091 assert!(
2092 msg.contains("hop_distances key"),
2093 "rejection must name the offending field; got: {msg}",
2094 );
2095 }
2096 other => {
2097 panic!("expected Invalid for oversized hop_distances key, got {other:?}")
2098 }
2099 }
2100 }
2101
2102 #[test]
2108 fn validate_bounds_accepts_at_boundary_lengths() {
2109 let mut node = NodeMetadata::new(make_node_id(1));
2110 node.location = Some(LocationInfo {
2111 region: Region::Europe("uk".into()),
2112 zone: Some("z".repeat(MAX_METADATA_STRING_LEN)),
2113 latitude: None,
2114 longitude: None,
2115 asn: None,
2116 provider: Some("p".repeat(MAX_METADATA_STRING_LEN)),
2117 datacenter: Some("d".repeat(MAX_METADATA_STRING_LEN)),
2118 country_code: Some("c".repeat(MAX_METADATA_STRING_LEN)),
2119 city: Some("y".repeat(MAX_METADATA_STRING_LEN)),
2120 });
2121 let mut topo = TopologyHints::new(NetworkTier::Consumer);
2122 topo.hop_distances
2123 .insert("k".repeat(MAX_METADATA_STRING_LEN), 1);
2124 node.topology = topo;
2125
2126 node.validate_bounds()
2127 .expect("at-boundary nested strings must validate");
2128 }
2129
2130 #[test]
2138 fn region_zone_returns_inner_zone_for_every_variant() {
2139 assert_eq!(Region::NorthAmerica("us-east".into()).zone(), "us-east");
2140 assert_eq!(Region::SouthAmerica("br-sp".into()).zone(), "br-sp");
2141 assert_eq!(Region::Europe("eu-west".into()).zone(), "eu-west");
2142 assert_eq!(Region::AsiaPacific("ap-1".into()).zone(), "ap-1");
2143 assert_eq!(Region::MiddleEast("me-1".into()).zone(), "me-1");
2144 assert_eq!(Region::Africa("af-1".into()).zone(), "af-1");
2145 assert_eq!(Region::Custom("custom-z".into()).zone(), "custom-z");
2146 }
2147
2148 #[test]
2149 fn node_status_routing_priority_orders_variants_correctly() {
2150 assert!(NodeStatus::Online.routing_priority() > NodeStatus::Degraded.routing_priority());
2154 assert!(NodeStatus::Degraded.routing_priority() > NodeStatus::Starting.routing_priority());
2155 assert!(NodeStatus::Starting.routing_priority() > NodeStatus::Draining.routing_priority());
2156 assert_eq!(NodeStatus::Offline.routing_priority(), 0);
2157 assert_eq!(NodeStatus::ShuttingDown.routing_priority(), 0);
2158 assert_eq!(NodeStatus::Maintenance.routing_priority(), 0);
2159 assert_eq!(NodeStatus::Online.routing_priority(), 5);
2160 }
2161
2162 #[test]
2163 fn average_latency_handles_empty_and_populated() {
2164 let mut topo = TopologyHints::new(NetworkTier::Consumer);
2165 assert!(topo.average_latency().is_none(), "empty must be None");
2166
2167 topo.update_latency(make_node_id(1), 100);
2168 topo.update_latency(make_node_id(2), 200);
2169 topo.update_latency(make_node_id(3), 300);
2170 assert_eq!(topo.average_latency(), Some(200.0));
2171 }
2172
2173 #[test]
2174 fn touch_advances_version_and_updated_at() {
2175 let mut node = NodeMetadata::new(make_node_id(1));
2176 let v0 = node.version;
2177 let t0 = node.updated_at;
2178 std::thread::sleep(std::time::Duration::from_millis(5));
2180 node.touch();
2181 assert_eq!(node.version, v0 + 1);
2182 assert!(
2183 node.updated_at >= t0,
2184 "updated_at must not go backward (got {} from {t0})",
2185 node.updated_at,
2186 );
2187 }
2188
2189 #[test]
2190 fn is_stale_compares_age_against_max_age() {
2191 let mut node = NodeMetadata::new(make_node_id(1));
2192 node.updated_at = node.updated_at.saturating_sub(60_000);
2195 assert!(node.is_stale(Duration::from_secs(10)));
2196 assert!(!node.is_stale(Duration::from_secs(3600)));
2197 }
2198
2199 #[test]
2202 fn validate_bounds_rejects_oversized_owner() {
2203 let mut node = NodeMetadata::new(make_node_id(1));
2204 node.owner = Some("o".repeat(MAX_METADATA_STRING_LEN + 1));
2205 let err = node.validate_bounds().unwrap_err();
2206 let msg = format!("{}", err);
2207 assert!(msg.contains("owner"), "error must name 'owner': {msg}");
2208 }
2209
2210 #[test]
2211 fn validate_bounds_rejects_too_many_tags() {
2212 let mut node = NodeMetadata::new(make_node_id(1));
2213 for i in 0..MAX_METADATA_TAGS + 1 {
2214 node.tags.insert(format!("t{i}"));
2215 }
2216 assert!(matches!(
2217 node.validate_bounds(),
2218 Err(MetadataError::Invalid(_))
2219 ));
2220 }
2221
2222 #[test]
2223 fn validate_bounds_rejects_oversized_role() {
2224 let mut node = NodeMetadata::new(make_node_id(1));
2225 node.roles.insert("r".repeat(MAX_METADATA_STRING_LEN + 1));
2226 let err = node.validate_bounds().unwrap_err();
2227 assert!(format!("{}", err).contains("role"));
2228 }
2229
2230 #[test]
2231 fn validate_bounds_rejects_oversized_custom_map_entries() {
2232 let mut node = NodeMetadata::new(make_node_id(1));
2233
2234 node.custom
2236 .insert("k".repeat(MAX_METADATA_STRING_LEN + 1), "v".into());
2237 let err = node.validate_bounds().unwrap_err();
2238 assert!(format!("{}", err).contains("custom key"));
2239
2240 node.custom.clear();
2242 node.custom
2243 .insert("k".into(), "v".repeat(MAX_METADATA_STRING_LEN + 1));
2244 let err = node.validate_bounds().unwrap_err();
2245 assert!(format!("{}", err).contains("custom value"));
2246 }
2247
2248 #[test]
2249 fn validate_bounds_rejects_too_many_custom_entries() {
2250 let mut node = NodeMetadata::new(make_node_id(1));
2251 for i in 0..MAX_METADATA_CUSTOM_ENTRIES + 1 {
2252 node.custom.insert(format!("k{i}"), "v".into());
2253 }
2254 assert!(matches!(
2255 node.validate_bounds(),
2256 Err(MetadataError::Invalid(_))
2257 ));
2258 }
2259}