Skip to main content

net/adapter/net/behavior/
metadata.rs

1//! Phase 4C: Node Metadata Surface (NODE-META)
2//!
3//! This module provides structured node metadata with location awareness,
4//! topology hints, and fast indexing for node discovery and routing.
5
6use dashmap::DashMap;
7use serde::{Deserialize, Serialize};
8use std::collections::{HashMap, HashSet};
9use std::net::IpAddr;
10use std::sync::atomic::{AtomicU64, AtomicUsize, Ordering};
11use std::sync::Arc;
12use std::time::{Duration, SystemTime, UNIX_EPOCH};
13
14/// Unique node identifier (32 bytes)
15pub type NodeId = [u8; 32];
16
17/// Geographic region identifier
18#[derive(Debug, Clone, PartialEq, Eq, Hash, Serialize, Deserialize)]
19pub enum Region {
20    /// North America
21    NorthAmerica(String),
22    /// South America
23    SouthAmerica(String),
24    /// Europe
25    Europe(String),
26    /// Asia Pacific
27    AsiaPacific(String),
28    /// Middle East
29    MiddleEast(String),
30    /// Africa
31    Africa(String),
32    /// Custom region
33    Custom(String),
34}
35
36impl Region {
37    /// Get the continent name
38    pub fn continent(&self) -> &'static str {
39        match self {
40            Region::NorthAmerica(_) => "north_america",
41            Region::SouthAmerica(_) => "south_america",
42            Region::Europe(_) => "europe",
43            Region::AsiaPacific(_) => "asia_pacific",
44            Region::MiddleEast(_) => "middle_east",
45            Region::Africa(_) => "africa",
46            Region::Custom(_) => "custom",
47        }
48    }
49
50    /// Get the zone within the region
51    pub fn zone(&self) -> &str {
52        match self {
53            Region::NorthAmerica(z)
54            | Region::SouthAmerica(z)
55            | Region::Europe(z)
56            | Region::AsiaPacific(z)
57            | Region::MiddleEast(z)
58            | Region::Africa(z)
59            | Region::Custom(z) => z,
60        }
61    }
62}
63
64/// Geographic location information
65#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
66pub struct LocationInfo {
67    /// Geographic region
68    pub region: Region,
69    /// Availability zone within region
70    pub zone: Option<String>,
71    /// Latitude (-90 to 90)
72    pub latitude: Option<f64>,
73    /// Longitude (-180 to 180)
74    pub longitude: Option<f64>,
75    /// Autonomous System Number
76    pub asn: Option<u32>,
77    /// ISP or cloud provider name
78    pub provider: Option<String>,
79    /// Data center identifier
80    pub datacenter: Option<String>,
81    /// Country code (ISO 3166-1 alpha-2)
82    pub country_code: Option<String>,
83    /// City name
84    pub city: Option<String>,
85}
86
87impl LocationInfo {
88    /// Create a new location with just region
89    pub fn new(region: Region) -> Self {
90        Self {
91            region,
92            zone: None,
93            latitude: None,
94            longitude: None,
95            asn: None,
96            provider: None,
97            datacenter: None,
98            country_code: None,
99            city: None,
100        }
101    }
102
103    /// Set coordinates
104    pub fn with_coordinates(mut self, lat: f64, lon: f64) -> Self {
105        self.latitude = Some(lat.clamp(-90.0, 90.0));
106        self.longitude = Some(lon.clamp(-180.0, 180.0));
107        self
108    }
109
110    /// Set ASN
111    pub fn with_asn(mut self, asn: u32) -> Self {
112        self.asn = Some(asn);
113        self
114    }
115
116    /// Set provider
117    pub fn with_provider(mut self, provider: impl Into<String>) -> Self {
118        self.provider = Some(provider.into());
119        self
120    }
121
122    /// Calculate approximate distance to another location in kilometers
123    /// Uses Haversine formula
124    pub fn distance_to(&self, other: &LocationInfo) -> Option<f64> {
125        let (lat1, lon1) = (self.latitude?, self.longitude?);
126        let (lat2, lon2) = (other.latitude?, other.longitude?);
127
128        let r = 6371.0; // Earth's radius in km
129        let d_lat = (lat2 - lat1).to_radians();
130        let d_lon = (lon2 - lon1).to_radians();
131        let lat1_rad = lat1.to_radians();
132        let lat2_rad = lat2.to_radians();
133
134        let a = (d_lat / 2.0).sin().powi(2)
135            + lat1_rad.cos() * lat2_rad.cos() * (d_lon / 2.0).sin().powi(2);
136        let c = 2.0 * a.sqrt().asin();
137
138        Some(r * c)
139    }
140
141    /// Check if same continent
142    pub fn same_continent(&self, other: &LocationInfo) -> bool {
143        self.region.continent() == other.region.continent()
144    }
145
146    /// Check if same region
147    pub fn same_region(&self, other: &LocationInfo) -> bool {
148        self.region == other.region
149    }
150}
151
152/// NAT type for connectivity hints
153#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
154pub enum NatType {
155    /// No NAT - direct connectivity
156    None,
157    /// Full cone NAT
158    FullCone,
159    /// Restricted cone NAT
160    RestrictedCone,
161    /// Port-restricted cone NAT
162    PortRestrictedCone,
163    /// Symmetric NAT (hardest to traverse)
164    Symmetric,
165    /// Unknown NAT type
166    Unknown,
167}
168
169impl NatType {
170    /// NAT traversal difficulty (0 = easiest, 4 = hardest)
171    pub fn difficulty(&self) -> u8 {
172        match self {
173            NatType::None => 0,
174            NatType::FullCone => 1,
175            NatType::RestrictedCone => 2,
176            NatType::PortRestrictedCone => 3,
177            NatType::Symmetric => 4,
178            NatType::Unknown => 3,
179        }
180    }
181
182    /// Can establish direct connection with another NAT type
183    pub fn can_connect_direct(&self, other: &NatType) -> bool {
184        // At least one side should be easy to traverse
185        self.difficulty() + other.difficulty() < 5
186    }
187}
188
189/// Network tier classification
190#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize, PartialOrd, Ord)]
191pub enum NetworkTier {
192    /// Edge device (mobile, IoT)
193    Edge = 0,
194    /// Consumer connection
195    Consumer = 1,
196    /// Business/prosumer connection
197    Business = 2,
198    /// Data center with standard connectivity
199    Datacenter = 3,
200    /// Premium data center with high bandwidth
201    Premium = 4,
202    /// Core infrastructure node
203    Core = 5,
204}
205
206impl NetworkTier {
207    /// Get relative priority for routing (higher = prefer)
208    pub fn priority(&self) -> u8 {
209        *self as u8
210    }
211}
212
213/// Topology hints for routing optimization
214#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
215pub struct TopologyHints {
216    /// Preferred peer node IDs for routing
217    pub preferred_peers: Vec<NodeId>,
218    /// Network tier classification
219    pub tier: NetworkTier,
220    /// Uplink bandwidth in Gbps
221    pub uplink_gbps: Option<u32>,
222    /// Downlink bandwidth in Gbps
223    pub downlink_gbps: Option<u32>,
224    /// NAT type
225    pub nat_type: NatType,
226    /// Whether node can relay traffic
227    pub can_relay: bool,
228    /// Maximum relay connections
229    pub max_relay_connections: Option<u32>,
230    /// Known public addresses
231    pub public_addresses: Vec<IpAddr>,
232    /// STUN-discovered reflexive address
233    pub reflexive_address: Option<IpAddr>,
234    /// Average latency to known peers (node_id -> latency_ms)
235    #[serde(skip)]
236    pub peer_latencies: HashMap<NodeId, u32>,
237    /// Hop count to well-known nodes
238    pub hop_distances: HashMap<String, u8>,
239}
240
241impl Default for TopologyHints {
242    fn default() -> Self {
243        Self {
244            preferred_peers: Vec::new(),
245            tier: NetworkTier::Consumer,
246            uplink_gbps: None,
247            downlink_gbps: None,
248            nat_type: NatType::Unknown,
249            can_relay: false,
250            max_relay_connections: None,
251            public_addresses: Vec::new(),
252            reflexive_address: None,
253            peer_latencies: HashMap::new(),
254            hop_distances: HashMap::new(),
255        }
256    }
257}
258
259impl TopologyHints {
260    /// Create new topology hints
261    pub fn new(tier: NetworkTier) -> Self {
262        Self {
263            tier,
264            ..Default::default()
265        }
266    }
267
268    /// Set bandwidth
269    pub fn with_bandwidth(mut self, uplink: u32, downlink: u32) -> Self {
270        self.uplink_gbps = Some(uplink);
271        self.downlink_gbps = Some(downlink);
272        self
273    }
274
275    /// Set NAT type
276    pub fn with_nat(mut self, nat_type: NatType) -> Self {
277        self.nat_type = nat_type;
278        self
279    }
280
281    /// Enable relay capability
282    pub fn with_relay(mut self, max_connections: u32) -> Self {
283        self.can_relay = true;
284        self.max_relay_connections = Some(max_connections);
285        self
286    }
287
288    /// Add a preferred peer
289    pub fn add_preferred_peer(&mut self, peer: NodeId) {
290        if !self.preferred_peers.contains(&peer) {
291            self.preferred_peers.push(peer);
292        }
293    }
294
295    /// Update peer latency
296    pub fn update_latency(&mut self, peer: NodeId, latency_ms: u32) {
297        self.peer_latencies.insert(peer, latency_ms);
298    }
299
300    /// Get average latency to all known peers
301    pub fn average_latency(&self) -> Option<f64> {
302        if self.peer_latencies.is_empty() {
303            return None;
304        }
305        let sum: u64 = self.peer_latencies.values().map(|&v| v as u64).sum();
306        Some(sum as f64 / self.peer_latencies.len() as f64)
307    }
308
309    /// Estimate connectivity quality (0.0 - 1.0)
310    pub fn connectivity_score(&self) -> f64 {
311        let mut score = 0.0;
312
313        // Tier bonus (0-0.3)
314        score += (self.tier.priority() as f64) * 0.06;
315
316        // NAT type (0-0.2)
317        score += (4 - self.nat_type.difficulty()) as f64 * 0.05;
318
319        // Has public address (0.1)
320        if !self.public_addresses.is_empty() {
321            score += 0.1;
322        }
323
324        // Bandwidth (0-0.2)
325        if let Some(uplink) = self.uplink_gbps {
326            score += (uplink.min(1000) as f64 / 1000.0) * 0.2;
327        }
328
329        // Can relay (0.1)
330        if self.can_relay {
331            score += 0.1;
332        }
333
334        score.min(1.0)
335    }
336}
337
338/// Node operational status
339#[derive(Debug, Clone, Copy, PartialEq, Eq, Hash, Serialize, Deserialize)]
340pub enum NodeStatus {
341    /// Fully operational
342    Online,
343    /// Operational but with reduced capacity
344    Degraded,
345    /// Draining connections, no new work
346    Draining,
347    /// Under maintenance, may return
348    Maintenance,
349    /// Not responding
350    Offline,
351    /// Starting up
352    Starting,
353    /// Gracefully shutting down
354    ShuttingDown,
355}
356
357impl NodeStatus {
358    /// Whether node can accept new work
359    pub fn accepts_work(&self) -> bool {
360        matches!(self, NodeStatus::Online | NodeStatus::Degraded)
361    }
362
363    /// Whether node is reachable
364    pub fn is_reachable(&self) -> bool {
365        !matches!(self, NodeStatus::Offline)
366    }
367
368    /// Priority for routing (higher = prefer)
369    pub fn routing_priority(&self) -> u8 {
370        match self {
371            NodeStatus::Online => 5,
372            NodeStatus::Degraded => 3,
373            NodeStatus::Draining => 1,
374            NodeStatus::ShuttingDown => 0,
375            NodeStatus::Starting => 2,
376            NodeStatus::Maintenance => 0,
377            NodeStatus::Offline => 0,
378        }
379    }
380}
381
382/// Maximum allowed length of any single string field in
383/// `NodeMetadata` (`name`, `description`, `owner`, individual tag /
384/// role / `custom` keys and values). 1 KiB is far past any
385/// realistic operator-supplied label and bounds the per-string
386/// memory cost.
387pub const MAX_METADATA_STRING_LEN: usize = 1024;
388
389/// Maximum number of tags or roles in `NodeMetadata`. 256 is far
390/// past real usage (typical deployments use a handful of
391/// scope/tier/region tags); without this cap a peer could ship
392/// millions and turn one announcement into millions of `by_tag`
393/// index entries.
394pub const MAX_METADATA_TAGS: usize = 256;
395
396/// Maximum number of entries in the `custom` key-value map. Same
397/// pattern as tags — bounds the per-announcement footprint.
398pub const MAX_METADATA_CUSTOM_ENTRIES: usize = 256;
399
400/// Maximum number of `preferred_peers` in `TopologyHints`. Each
401/// entry is a 32-byte `NodeId`; a 4096-cap bounds the wire/heap
402/// cost while staying well above any realistic peering preference
403/// list.
404pub const MAX_PREFERRED_PEERS: usize = 4096;
405
406/// Maximum number of `hop_distances` entries in `TopologyHints`.
407pub const MAX_HOP_DISTANCES: usize = 4096;
408
409/// Maximum number of `public_addresses` (multi-homed nodes
410/// typically advertise <16; 256 is generous).
411pub const MAX_PUBLIC_ADDRESSES: usize = 256;
412
413/// Complete node metadata
414#[derive(Debug, Clone, PartialEq, Serialize, Deserialize)]
415pub struct NodeMetadata {
416    /// Unique node identifier
417    pub node_id: NodeId,
418    /// Human-readable node name
419    pub name: Option<String>,
420    /// Node description
421    pub description: Option<String>,
422    /// Owner identifier (org, user, etc.)
423    pub owner: Option<String>,
424    /// Geographic location
425    pub location: Option<LocationInfo>,
426    /// Topology hints
427    pub topology: TopologyHints,
428    /// Current status
429    pub status: NodeStatus,
430    /// Custom key-value metadata
431    pub custom: HashMap<String, String>,
432    /// Metadata version (monotonic)
433    pub version: u64,
434    /// Last update timestamp (Unix millis)
435    pub updated_at: u64,
436    /// Creation timestamp (Unix millis)
437    pub created_at: u64,
438    /// Tags for categorization
439    pub tags: HashSet<String>,
440    /// Node roles
441    pub roles: HashSet<String>,
442}
443
444impl NodeMetadata {
445    /// Validate that the metadata fits within the per-field
446    /// boundedness caps (string lengths, tag/role/custom counts,
447    /// preferred-peers / hop-distances / public-addresses sizes).
448    ///
449    /// Without these caps the deserialize path is unbounded —
450    /// every `Vec` / `HashMap` / `String` would accept whatever
451    /// the peer shipped, and `MetadataStore::upsert` would
452    /// happily index millions of attacker-supplied tags into the
453    /// per-tag inverted-index DashMap. `upsert` (and
454    /// `update_versioned`) call `validate_bounds` before touching
455    /// the indexes; oversized metadata surfaces as
456    /// `MetadataError::Invalid(...)`.
457    pub fn validate_bounds(&self) -> Result<(), MetadataError> {
458        if let Some(name) = &self.name {
459            if name.len() > MAX_METADATA_STRING_LEN {
460                return Err(MetadataError::Invalid(format!(
461                    "name exceeds {} bytes",
462                    MAX_METADATA_STRING_LEN
463                )));
464            }
465        }
466        if let Some(d) = &self.description {
467            if d.len() > MAX_METADATA_STRING_LEN {
468                return Err(MetadataError::Invalid(format!(
469                    "description exceeds {} bytes",
470                    MAX_METADATA_STRING_LEN
471                )));
472            }
473        }
474        if let Some(o) = &self.owner {
475            if o.len() > MAX_METADATA_STRING_LEN {
476                return Err(MetadataError::Invalid(format!(
477                    "owner exceeds {} bytes",
478                    MAX_METADATA_STRING_LEN
479                )));
480            }
481        }
482        if self.tags.len() > MAX_METADATA_TAGS {
483            return Err(MetadataError::Invalid(format!(
484                "tags exceed {} entries",
485                MAX_METADATA_TAGS
486            )));
487        }
488        for tag in &self.tags {
489            if tag.len() > MAX_METADATA_STRING_LEN {
490                return Err(MetadataError::Invalid(format!(
491                    "tag exceeds {} bytes",
492                    MAX_METADATA_STRING_LEN
493                )));
494            }
495        }
496        if self.roles.len() > MAX_METADATA_TAGS {
497            return Err(MetadataError::Invalid(format!(
498                "roles exceed {} entries",
499                MAX_METADATA_TAGS
500            )));
501        }
502        for role in &self.roles {
503            if role.len() > MAX_METADATA_STRING_LEN {
504                return Err(MetadataError::Invalid(format!(
505                    "role exceeds {} bytes",
506                    MAX_METADATA_STRING_LEN
507                )));
508            }
509        }
510        if self.custom.len() > MAX_METADATA_CUSTOM_ENTRIES {
511            return Err(MetadataError::Invalid(format!(
512                "custom map exceeds {} entries",
513                MAX_METADATA_CUSTOM_ENTRIES
514            )));
515        }
516        for (k, v) in &self.custom {
517            if k.len() > MAX_METADATA_STRING_LEN {
518                return Err(MetadataError::Invalid(format!(
519                    "custom key exceeds {} bytes",
520                    MAX_METADATA_STRING_LEN
521                )));
522            }
523            if v.len() > MAX_METADATA_STRING_LEN {
524                return Err(MetadataError::Invalid(format!(
525                    "custom value exceeds {} bytes",
526                    MAX_METADATA_STRING_LEN
527                )));
528            }
529        }
530        if self.topology.preferred_peers.len() > MAX_PREFERRED_PEERS {
531            return Err(MetadataError::Invalid(format!(
532                "preferred_peers exceed {} entries",
533                MAX_PREFERRED_PEERS
534            )));
535        }
536        if self.topology.hop_distances.len() > MAX_HOP_DISTANCES {
537            return Err(MetadataError::Invalid(format!(
538                "hop_distances exceed {} entries",
539                MAX_HOP_DISTANCES
540            )));
541        }
542        // hop_distances keys are unbounded `String`s. Without a
543        // per-key length check a peer could ship a single
544        // multi-megabyte key inside a perfectly-counted map and
545        // smuggle the bound past validate_bounds.
546        for k in self.topology.hop_distances.keys() {
547            if k.len() > MAX_METADATA_STRING_LEN {
548                return Err(MetadataError::Invalid(format!(
549                    "hop_distances key exceeds {} bytes",
550                    MAX_METADATA_STRING_LEN
551                )));
552            }
553        }
554        if self.topology.public_addresses.len() > MAX_PUBLIC_ADDRESSES {
555            return Err(MetadataError::Invalid(format!(
556                "public_addresses exceed {} entries",
557                MAX_PUBLIC_ADDRESSES
558            )));
559        }
560        // Nested LocationInfo strings (`zone`, `provider`,
561        // `datacenter`, `country_code`, `city`) MUST be length-
562        // checked. The top-level fields and collection counts
563        // alone wouldn't catch oversized strings pinned inside
564        // `location`.
565        if let Some(loc) = &self.location {
566            for (label, field) in [
567                ("location.zone", &loc.zone),
568                ("location.provider", &loc.provider),
569                ("location.datacenter", &loc.datacenter),
570                ("location.country_code", &loc.country_code),
571                ("location.city", &loc.city),
572            ] {
573                if let Some(v) = field {
574                    if v.len() > MAX_METADATA_STRING_LEN {
575                        return Err(MetadataError::Invalid(format!(
576                            "{label} exceeds {MAX_METADATA_STRING_LEN} bytes",
577                        )));
578                    }
579                }
580            }
581        }
582        Ok(())
583    }
584
585    /// Create new node metadata
586    pub fn new(node_id: NodeId) -> Self {
587        let now = SystemTime::now()
588            .duration_since(UNIX_EPOCH)
589            .unwrap_or_default()
590            .as_millis() as u64;
591
592        Self {
593            node_id,
594            name: None,
595            description: None,
596            owner: None,
597            location: None,
598            topology: TopologyHints::default(),
599            status: NodeStatus::Starting,
600            custom: HashMap::new(),
601            version: 1,
602            updated_at: now,
603            created_at: now,
604            tags: HashSet::new(),
605            roles: HashSet::new(),
606        }
607    }
608
609    /// Set name
610    pub fn with_name(mut self, name: impl Into<String>) -> Self {
611        self.name = Some(name.into());
612        self
613    }
614
615    /// Set owner
616    pub fn with_owner(mut self, owner: impl Into<String>) -> Self {
617        self.owner = Some(owner.into());
618        self
619    }
620
621    /// Set location
622    pub fn with_location(mut self, location: LocationInfo) -> Self {
623        self.location = Some(location);
624        self
625    }
626
627    /// Set topology
628    pub fn with_topology(mut self, topology: TopologyHints) -> Self {
629        self.topology = topology;
630        self
631    }
632
633    /// Set status
634    pub fn with_status(mut self, status: NodeStatus) -> Self {
635        self.status = status;
636        self
637    }
638
639    /// Add tag
640    pub fn with_tag(mut self, tag: impl Into<String>) -> Self {
641        self.tags.insert(tag.into());
642        self
643    }
644
645    /// Add role
646    pub fn with_role(mut self, role: impl Into<String>) -> Self {
647        self.roles.insert(role.into());
648        self
649    }
650
651    /// Add custom metadata
652    pub fn with_custom(mut self, key: impl Into<String>, value: impl Into<String>) -> Self {
653        self.custom.insert(key.into(), value.into());
654        self
655    }
656
657    /// Update and increment version
658    pub fn touch(&mut self) {
659        self.version += 1;
660        self.updated_at = SystemTime::now()
661            .duration_since(UNIX_EPOCH)
662            .unwrap_or_default()
663            .as_millis() as u64;
664    }
665
666    /// Get age since last update
667    pub fn age(&self) -> Duration {
668        let now = SystemTime::now()
669            .duration_since(UNIX_EPOCH)
670            .unwrap_or_default()
671            .as_millis() as u64;
672        Duration::from_millis(now.saturating_sub(self.updated_at))
673    }
674
675    /// Check if metadata is stale
676    pub fn is_stale(&self, max_age: Duration) -> bool {
677        self.age() > max_age
678    }
679
680    /// Calculate routing score for this node
681    pub fn routing_score(&self) -> f64 {
682        let mut score = 0.0;
683
684        // Status priority (0-0.5)
685        score += (self.status.routing_priority() as f64) * 0.1;
686
687        // Topology score (0-0.3)
688        score += self.topology.connectivity_score() * 0.3;
689
690        // Tier bonus (0-0.2)
691        score += (self.topology.tier.priority() as f64) * 0.04;
692
693        score.min(1.0)
694    }
695}
696
697/// Query filter for metadata store
698#[derive(Debug, Clone, Default)]
699pub struct MetadataQuery {
700    /// Filter by status
701    pub status: Option<NodeStatus>,
702    /// Filter by statuses (any match)
703    pub statuses: Option<Vec<NodeStatus>>,
704    /// Filter by region continent
705    pub continent: Option<String>,
706    /// Filter by region
707    pub region: Option<Region>,
708    /// Filter by minimum tier
709    pub min_tier: Option<NetworkTier>,
710    /// Filter by tag (all must match)
711    pub tags: Option<Vec<String>>,
712    /// Filter by role (all must match)
713    pub roles: Option<Vec<String>>,
714    /// Filter by owner
715    pub owner: Option<String>,
716    /// Maximum age since last update
717    pub max_age: Option<Duration>,
718    /// Must accept new work
719    pub accepts_work: Option<bool>,
720    /// Must be able to relay
721    pub can_relay: Option<bool>,
722    /// Maximum results
723    pub limit: Option<usize>,
724}
725
726impl MetadataQuery {
727    /// Create empty query
728    pub fn new() -> Self {
729        Self::default()
730    }
731
732    /// Filter by status
733    pub fn with_status(mut self, status: NodeStatus) -> Self {
734        self.status = Some(status);
735        self
736    }
737
738    /// Filter by multiple statuses
739    pub fn with_statuses(mut self, statuses: Vec<NodeStatus>) -> Self {
740        self.statuses = Some(statuses);
741        self
742    }
743
744    /// Filter by continent
745    pub fn with_continent(mut self, continent: impl Into<String>) -> Self {
746        self.continent = Some(continent.into());
747        self
748    }
749
750    /// Filter by region
751    pub fn with_region(mut self, region: Region) -> Self {
752        self.region = Some(region);
753        self
754    }
755
756    /// Filter by minimum tier
757    pub fn with_min_tier(mut self, tier: NetworkTier) -> Self {
758        self.min_tier = Some(tier);
759        self
760    }
761
762    /// Filter by tags
763    pub fn with_tags(mut self, tags: Vec<String>) -> Self {
764        self.tags = Some(tags);
765        self
766    }
767
768    /// Filter by roles
769    pub fn with_roles(mut self, roles: Vec<String>) -> Self {
770        self.roles = Some(roles);
771        self
772    }
773
774    /// Filter by owner
775    pub fn with_owner(mut self, owner: impl Into<String>) -> Self {
776        self.owner = Some(owner.into());
777        self
778    }
779
780    /// Filter by max age
781    pub fn with_max_age(mut self, max_age: Duration) -> Self {
782        self.max_age = Some(max_age);
783        self
784    }
785
786    /// Filter nodes that accept work
787    pub fn accepting_work(mut self) -> Self {
788        self.accepts_work = Some(true);
789        self
790    }
791
792    /// Filter nodes that can relay
793    pub fn can_relay(mut self) -> Self {
794        self.can_relay = Some(true);
795        self
796    }
797
798    /// Limit results
799    pub fn with_limit(mut self, limit: usize) -> Self {
800        self.limit = Some(limit);
801        self
802    }
803
804    /// Check if metadata matches query
805    pub fn matches(&self, meta: &NodeMetadata) -> bool {
806        // Status filter
807        if let Some(status) = self.status {
808            if meta.status != status {
809                return false;
810            }
811        }
812
813        // Multiple statuses
814        if let Some(ref statuses) = self.statuses {
815            if !statuses.contains(&meta.status) {
816                return false;
817            }
818        }
819
820        // Continent filter
821        if let Some(ref continent) = self.continent {
822            if let Some(ref loc) = meta.location {
823                if loc.region.continent() != continent {
824                    return false;
825                }
826            } else {
827                return false;
828            }
829        }
830
831        // Region filter
832        if let Some(ref region) = self.region {
833            if let Some(ref loc) = meta.location {
834                if &loc.region != region {
835                    return false;
836                }
837            } else {
838                return false;
839            }
840        }
841
842        // Tier filter
843        if let Some(min_tier) = self.min_tier {
844            if meta.topology.tier < min_tier {
845                return false;
846            }
847        }
848
849        // Tags filter (all must match)
850        if let Some(ref tags) = self.tags {
851            for tag in tags {
852                if !meta.tags.contains(tag) {
853                    return false;
854                }
855            }
856        }
857
858        // Roles filter (all must match)
859        if let Some(ref roles) = self.roles {
860            for role in roles {
861                if !meta.roles.contains(role) {
862                    return false;
863                }
864            }
865        }
866
867        // Owner filter
868        if let Some(ref owner) = self.owner {
869            if meta.owner.as_ref() != Some(owner) {
870                return false;
871            }
872        }
873
874        // Age filter
875        if let Some(max_age) = self.max_age {
876            if meta.is_stale(max_age) {
877                return false;
878            }
879        }
880
881        // Accepts work filter
882        if let Some(accepts) = self.accepts_work {
883            if meta.status.accepts_work() != accepts {
884                return false;
885            }
886        }
887
888        // Can relay filter
889        if let Some(can_relay) = self.can_relay {
890            if meta.topology.can_relay != can_relay {
891                return false;
892            }
893        }
894
895        true
896    }
897}
898
899/// Metadata store errors
900#[derive(Debug, Clone, PartialEq, Eq)]
901pub enum MetadataError {
902    /// Node not found
903    NotFound(NodeId),
904    /// Version conflict
905    VersionConflict {
906        /// Expected metadata version
907        expected: u64,
908        /// Actual metadata version
909        actual: u64,
910    },
911    /// Invalid metadata
912    Invalid(String),
913    /// Store capacity exceeded
914    CapacityExceeded,
915}
916
917impl std::fmt::Display for MetadataError {
918    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
919        match self {
920            MetadataError::NotFound(_) => write!(f, "Node not found"),
921            MetadataError::VersionConflict { expected, actual } => {
922                write!(f, "Version conflict: expected {}, got {}", expected, actual)
923            }
924            MetadataError::Invalid(msg) => write!(f, "Invalid metadata: {}", msg),
925            MetadataError::CapacityExceeded => write!(f, "Store capacity exceeded"),
926        }
927    }
928}
929
930impl std::error::Error for MetadataError {}
931
932/// Statistics for MetadataStore
933#[derive(Debug, Clone, Default)]
934pub struct MetadataStoreStats {
935    /// Total nodes stored
936    pub total_nodes: usize,
937    /// Nodes by status
938    pub by_status: HashMap<NodeStatus, usize>,
939    /// Nodes by tier
940    pub by_tier: HashMap<NetworkTier, usize>,
941    /// Nodes by continent
942    pub by_continent: HashMap<String, usize>,
943    /// Total queries performed
944    pub queries: u64,
945    /// Total updates performed
946    pub updates: u64,
947}
948
949/// High-performance metadata store with indexes
950pub struct MetadataStore {
951    /// Primary storage
952    nodes: DashMap<NodeId, Arc<NodeMetadata>>,
953    /// Index by status
954    by_status: DashMap<NodeStatus, HashSet<NodeId>>,
955    /// Index by tier
956    by_tier: DashMap<NetworkTier, HashSet<NodeId>>,
957    /// Index by continent
958    by_continent: DashMap<String, HashSet<NodeId>>,
959    /// Index by tag
960    by_tag: DashMap<String, HashSet<NodeId>>,
961    /// Index by role
962    by_role: DashMap<String, HashSet<NodeId>>,
963    /// Index by owner
964    by_owner: DashMap<String, HashSet<NodeId>>,
965    /// Query counter
966    query_count: AtomicU64,
967    /// Update counter
968    update_count: AtomicU64,
969    /// O(1) live node count. `DashMap::len()` walks every shard (~1us); the
970    /// capacity gate in `upsert` and `len()`/`stats()` read this instead.
971    /// Maintained exactly on the Vacant-insert / remove paths. See
972    /// docs/misc/PERF_AUDIT_2026_06_08_BENCHMARK_WINS.md §2/§4.
973    node_count: AtomicUsize,
974    /// Maximum capacity
975    max_capacity: Option<usize>,
976}
977
978impl MetadataStore {
979    /// Create new metadata store
980    pub fn new() -> Self {
981        Self {
982            nodes: DashMap::new(),
983            by_status: DashMap::new(),
984            by_tier: DashMap::new(),
985            by_continent: DashMap::new(),
986            by_tag: DashMap::new(),
987            by_role: DashMap::new(),
988            by_owner: DashMap::new(),
989            query_count: AtomicU64::new(0),
990            update_count: AtomicU64::new(0),
991            node_count: AtomicUsize::new(0),
992            max_capacity: None,
993        }
994    }
995
996    /// Create store with capacity limit
997    pub fn with_capacity(max_capacity: usize) -> Self {
998        let mut store = Self::new();
999        store.max_capacity = Some(max_capacity);
1000        store
1001    }
1002
1003    /// Insert or update node metadata
1004    ///
1005    /// The entire (read-old, remove-from-indexes,
1006    /// add-to-indexes, insert) sequence runs inside
1007    /// `DashMap::entry`'s shard write lock, serializing all
1008    /// concurrent upserts on the same node_id. Splitting this into
1009    /// a 5-step sequence without an overarching lock — (1) capacity
1010    /// check, (2) `nodes.get(&id)`, (3) `remove_from_indexes(&old)`,
1011    /// (4) `add_to_indexes(&new)`, (5) `nodes.insert` — would let
1012    /// two concurrent upserts on the same node both observe the
1013    /// same `old` at step 2, both remove its index entries at step
1014    /// 3 (second a no-op), and both add to indexes at step 4 into
1015    /// DIFFERENT buckets if the metadata differed. Whichever
1016    /// `nodes.insert` landed second would win, but the loser's
1017    /// index entries would never be removed, producing permanent
1018    /// index drift (queries return the node under the wrong
1019    /// filter; stats over-count).
1020    pub fn upsert(&self, metadata: NodeMetadata) -> Result<(), MetadataError> {
1021        // Bound peer-supplied metadata before touching the
1022        // indexes — without this, one peer could ship a single
1023        // announcement carrying millions of unique tags and turn
1024        // it into millions of `by_tag` DashMap entries. Validation
1025        // runs first so we don't even pay the index-clear cost on
1026        // a bad input.
1027        metadata.validate_bounds()?;
1028
1029        let node_id = metadata.node_id;
1030
1031        // Capacity check BEFORE entering the entry guard —
1032        // `self.nodes.len()` walks all shards and would deadlock
1033        // if called while we hold a write guard on one of them.
1034        // The soft-cap race window (a concurrent upsert lands
1035        // between this check and the entry-acquire below) is
1036        // acceptable: the cap is best-effort, mirroring the
1037        // pattern used by `TokenCache::insert_unchecked` and
1038        // `ContextStore::create_context`.
1039        if let Some(max) = self.max_capacity {
1040            if !self.nodes.contains_key(&node_id) && self.node_count.load(Ordering::Relaxed) >= max
1041            {
1042                return Err(MetadataError::CapacityExceeded);
1043            }
1044        }
1045
1046        // Take the per-shard write lock on the node_id entry
1047        // FIRST. Holding it serializes all concurrent upserts on
1048        // this id, so the (remove_from_indexes, add_to_indexes,
1049        // insert) sequence is observed by other upserts as a
1050        // single atomic transition. A read-old outside any lock
1051        // would let two threads both observe the same `old`, both
1052        // `remove_from_indexes(&old)`, both `add_to_indexes` into
1053        // different buckets, and the loser's entries would leak
1054        // into permanent index drift.
1055        //
1056        // `add_to_indexes` / `remove_from_indexes` write to
1057        // OTHER DashMap instances (`by_status`, `by_tag`, etc.).
1058        // The lock-order convention is: hold `nodes` entry FIRST,
1059        // then write the index DashMaps. As long as no other
1060        // operation locks an index DashMap and then reaches into
1061        // `nodes`, we're deadlock-free. The `query` path takes
1062        // index DashMap snapshots first, then reads `nodes`
1063        // afterwards — that order is the inverse of ours, but
1064        // each step's lock is released before the next is taken,
1065        // so there's no held-lock chain to deadlock.
1066        use dashmap::mapref::entry::Entry;
1067        match self.nodes.entry(node_id) {
1068            Entry::Vacant(slot) => {
1069                self.add_to_indexes(&metadata);
1070                slot.insert(Arc::new(metadata));
1071                // Genuine insert (the only branch that grows `nodes`).
1072                self.node_count.fetch_add(1, Ordering::Relaxed);
1073            }
1074            Entry::Occupied(mut slot) => {
1075                // Read the old metadata WHILE holding the entry
1076                // lock — this is the critical change vs. pre-fix,
1077                // where the read-old happened before the lock and
1078                // could be invalidated by a concurrent upsert.
1079                let old = slot.get().clone();
1080                self.remove_from_indexes(&old);
1081                self.add_to_indexes(&metadata);
1082                slot.insert(Arc::new(metadata));
1083            }
1084        }
1085        self.update_count.fetch_add(1, Ordering::Relaxed);
1086
1087        Ok(())
1088    }
1089
1090    /// Update with version check (optimistic locking)
1091    pub fn update_versioned(
1092        &self,
1093        metadata: NodeMetadata,
1094        expected_version: u64,
1095    ) -> Result<(), MetadataError> {
1096        let node_id = metadata.node_id;
1097
1098        // Check version
1099        if let Some(existing) = self.nodes.get(&node_id) {
1100            if existing.version != expected_version {
1101                return Err(MetadataError::VersionConflict {
1102                    expected: expected_version,
1103                    actual: existing.version,
1104                });
1105            }
1106        }
1107
1108        self.upsert(metadata)
1109    }
1110
1111    /// Get node metadata
1112    pub fn get(&self, node_id: &NodeId) -> Option<Arc<NodeMetadata>> {
1113        self.nodes.get(node_id).map(|r| Arc::clone(&r))
1114    }
1115
1116    /// Remove node
1117    pub fn remove(&self, node_id: &NodeId) -> Option<Arc<NodeMetadata>> {
1118        if let Some((_, meta)) = self.nodes.remove(node_id) {
1119            self.remove_from_indexes(&meta);
1120            self.node_count.fetch_sub(1, Ordering::Relaxed);
1121            Some(meta)
1122        } else {
1123            None
1124        }
1125    }
1126
1127    /// Query nodes
1128    pub fn query(&self, query: &MetadataQuery) -> Vec<Arc<NodeMetadata>> {
1129        self.query_count.fetch_add(1, Ordering::Relaxed);
1130
1131        // Use indexes for initial filtering if possible
1132        let candidates: Vec<NodeId> = if let Some(status) = query.status {
1133            self.by_status
1134                .get(&status)
1135                .map(|s| s.iter().copied().collect())
1136                .unwrap_or_default()
1137        } else if let Some(ref continent) = query.continent {
1138            self.by_continent
1139                .get(continent)
1140                .map(|s| s.iter().copied().collect())
1141                .unwrap_or_default()
1142        } else if let Some(min_tier) = query.min_tier {
1143            // Collect all nodes at or above min_tier
1144            let mut nodes = HashSet::new();
1145            for tier in [
1146                NetworkTier::Edge,
1147                NetworkTier::Consumer,
1148                NetworkTier::Business,
1149                NetworkTier::Datacenter,
1150                NetworkTier::Premium,
1151                NetworkTier::Core,
1152            ] {
1153                if tier >= min_tier {
1154                    if let Some(tier_nodes) = self.by_tier.get(&tier) {
1155                        nodes.extend(tier_nodes.iter().copied());
1156                    }
1157                }
1158            }
1159            nodes.into_iter().collect()
1160        } else {
1161            // Full scan
1162            self.nodes.iter().map(|r| *r.key()).collect()
1163        };
1164
1165        // Filter and collect results
1166        let mut results: Vec<Arc<NodeMetadata>> = candidates
1167            .into_iter()
1168            .filter_map(|id| self.nodes.get(&id).map(|r| Arc::clone(&r)))
1169            .filter(|meta| query.matches(meta))
1170            .collect();
1171
1172        // Apply limit
1173        if let Some(limit) = query.limit {
1174            results.truncate(limit);
1175        }
1176
1177        results
1178    }
1179
1180    /// Find nodes near a location
1181    pub fn find_nearby(
1182        &self,
1183        location: &LocationInfo,
1184        max_distance_km: f64,
1185        limit: usize,
1186    ) -> Vec<(Arc<NodeMetadata>, f64)> {
1187        self.query_count.fetch_add(1, Ordering::Relaxed);
1188
1189        let mut results: Vec<(Arc<NodeMetadata>, f64)> = self
1190            .nodes
1191            .iter()
1192            .filter_map(|r| {
1193                let meta = Arc::clone(r.value());
1194                meta.location
1195                    .as_ref()
1196                    .and_then(|loc| location.distance_to(loc))
1197                    .filter(|&d| d <= max_distance_km)
1198                    .map(|d| (meta, d))
1199            })
1200            .collect();
1201
1202        // `partial_cmp(...).unwrap_or(Equal)` on NaN produces a
1203        // non-total order — `sort_by` would permute arbitrarily
1204        // and `truncate(limit)` would then drop random items.
1205        // `LocationInfo::distance_to` computes `(...).asin()` for
1206        // near-antipodal points where FP rounding can push the
1207        // asin argument > 1.0 → NaN. `total_cmp` on a
1208        // NaN-sentinel score (`f64::INFINITY` so NaN distances
1209        // sink to the end) gives a deterministic total order.
1210        results.sort_by(|a, b| {
1211            let a_dist = if a.1.is_nan() { f64::INFINITY } else { a.1 };
1212            let b_dist = if b.1.is_nan() { f64::INFINITY } else { b.1 };
1213            a_dist.total_cmp(&b_dist)
1214        });
1215        results.truncate(limit);
1216
1217        results
1218    }
1219
1220    /// Find best nodes for routing
1221    pub fn find_best_for_routing(&self, limit: usize) -> Vec<Arc<NodeMetadata>> {
1222        self.query_count.fetch_add(1, Ordering::Relaxed);
1223
1224        let mut results: Vec<(Arc<NodeMetadata>, f64)> = self
1225            .nodes
1226            .iter()
1227            .filter(|r| r.value().status.accepts_work())
1228            .map(|r| {
1229                let meta = Arc::clone(r.value());
1230                let score = meta.routing_score();
1231                (meta, score)
1232            })
1233            .collect();
1234
1235        // Same hazard as `find_nearby` above. Sort descending
1236        // with `total_cmp`; NaN scores sink to the end (treated
1237        // as `f64::NEG_INFINITY` for descending).
1238        results.sort_by(|a, b| {
1239            let a_score = if a.1.is_nan() { f64::NEG_INFINITY } else { a.1 };
1240            let b_score = if b.1.is_nan() { f64::NEG_INFINITY } else { b.1 };
1241            b_score.total_cmp(&a_score)
1242        });
1243        results.truncate(limit);
1244
1245        results.into_iter().map(|(m, _)| m).collect()
1246    }
1247
1248    /// Find relay nodes
1249    pub fn find_relays(&self) -> Vec<Arc<NodeMetadata>> {
1250        self.query(&MetadataQuery::new().can_relay().accepting_work())
1251    }
1252
1253    /// Get statistics
1254    pub fn stats(&self) -> MetadataStoreStats {
1255        // Histograms come straight from the inverted indexes, which
1256        // add_to_indexes/remove_from_indexes keep in sync. This is
1257        // O(distinct keys) — a handful of statuses/tiers/continents —
1258        // instead of an O(nodes) full scan with a String allocation per
1259        // node. `remove_from_indexes` can leave an empty set behind, so
1260        // skip zero-count buckets to match the old scan's "absent key"
1261        // output (a status with no nodes was simply never in the map).
1262        let by_status: HashMap<NodeStatus, usize> = self
1263            .by_status
1264            .iter()
1265            .filter(|e| !e.value().is_empty())
1266            .map(|e| (*e.key(), e.value().len()))
1267            .collect();
1268        let by_tier: HashMap<NetworkTier, usize> = self
1269            .by_tier
1270            .iter()
1271            .filter(|e| !e.value().is_empty())
1272            .map(|e| (*e.key(), e.value().len()))
1273            .collect();
1274        let by_continent: HashMap<String, usize> = self
1275            .by_continent
1276            .iter()
1277            .filter(|e| !e.value().is_empty())
1278            .map(|e| (e.key().clone(), e.value().len()))
1279            .collect();
1280
1281        MetadataStoreStats {
1282            total_nodes: self.node_count.load(Ordering::Relaxed),
1283            by_status,
1284            by_tier,
1285            by_continent,
1286            queries: self.query_count.load(Ordering::Relaxed),
1287            updates: self.update_count.load(Ordering::Relaxed),
1288        }
1289    }
1290
1291    /// Number of nodes
1292    pub fn len(&self) -> usize {
1293        self.node_count.load(Ordering::Relaxed)
1294    }
1295
1296    /// Check if empty
1297    pub fn is_empty(&self) -> bool {
1298        self.node_count.load(Ordering::Relaxed) == 0
1299    }
1300
1301    /// Clear all nodes
1302    ///
1303    /// Drains `nodes` FIRST and routes every drained metadata
1304    /// through `remove_from_indexes` — making the intermediate
1305    /// state consistent (nodes exist alongside their indexes
1306    /// throughout the drain). A naive `nodes.clear()` followed by
1307    /// six index `clear()`s in sequence would let a concurrent
1308    /// `upsert` landing between any two of those clears observe
1309    /// `nodes.get(&id) → None` (skipping `remove_from_indexes`),
1310    /// then `add_to_indexes` (writing into the SAME index maps
1311    /// `clear` is about to wipe), then `nodes.insert(...)` — the
1312    /// final state would be a node in `nodes` with NO index
1313    /// entries, invisible to every indexed query and only
1314    /// retrievable via the full-scan branch.
1315    ///
1316    /// With the drain-first ordering, any concurrent `upsert`
1317    /// landing during the drain either races BEFORE this function
1318    /// reads its key (the upsert wins; we drain its entry
1319    /// afterward) or AFTER (the upsert observes a cleared `nodes`
1320    /// and proceeds normally — no index drift, since
1321    /// `remove_from_indexes` only touches keys that exist in
1322    /// `nodes`). The final `clear`s on the index maps catch any
1323    /// residual entries the per-key path missed
1324    /// (defense-in-depth; should be no-ops on the happy path).
1325    pub fn clear(&self) {
1326        // `dashmap::DashMap` doesn't have a `drain()` that takes
1327        // ownership of every entry; use a remove-on-iter pattern.
1328        // Collect keys first, then remove and route through
1329        // `remove_from_indexes`. Holding the iter guard across
1330        // `remove` would deadlock — the keys vec is the
1331        // intermediate.
1332        let keys: Vec<NodeId> = self.nodes.iter().map(|r| *r.key()).collect();
1333        for key in keys {
1334            if let Some((_, meta)) = self.nodes.remove(&key) {
1335                self.remove_from_indexes(&meta);
1336                // Per-remove decrement (not store(0)) so a concurrent upsert
1337                // landing during the drain keeps its own +1 — the count stays
1338                // exact w.r.t. `nodes` rather than racing to a wrong 0.
1339                self.node_count.fetch_sub(1, Ordering::Relaxed);
1340            }
1341        }
1342        // Defense-in-depth: clear any residual entries that may
1343        // have leaked in via a concurrent upsert that landed
1344        // after our key-collection iterator finished but before
1345        // this point. These should be no-ops on the happy path.
1346        self.by_status.clear();
1347        self.by_tier.clear();
1348        self.by_continent.clear();
1349        self.by_tag.clear();
1350        self.by_role.clear();
1351        self.by_owner.clear();
1352    }
1353
1354    // Private helper to add node to indexes
1355    fn add_to_indexes(&self, meta: &NodeMetadata) {
1356        let node_id = meta.node_id;
1357
1358        // Status index
1359        self.by_status
1360            .entry(meta.status)
1361            .or_default()
1362            .insert(node_id);
1363
1364        // Tier index
1365        self.by_tier
1366            .entry(meta.topology.tier)
1367            .or_default()
1368            .insert(node_id);
1369
1370        // Continent index
1371        if let Some(ref loc) = meta.location {
1372            self.by_continent
1373                .entry(loc.region.continent().to_string())
1374                .or_default()
1375                .insert(node_id);
1376        }
1377
1378        // Tag index
1379        for tag in &meta.tags {
1380            self.by_tag.entry(tag.clone()).or_default().insert(node_id);
1381        }
1382
1383        // Role index
1384        for role in &meta.roles {
1385            self.by_role
1386                .entry(role.clone())
1387                .or_default()
1388                .insert(node_id);
1389        }
1390
1391        // Owner index
1392        if let Some(ref owner) = meta.owner {
1393            self.by_owner
1394                .entry(owner.clone())
1395                .or_default()
1396                .insert(node_id);
1397        }
1398    }
1399
1400    // Private helper to remove node from indexes
1401    fn remove_from_indexes(&self, meta: &NodeMetadata) {
1402        let node_id = meta.node_id;
1403
1404        // Status index
1405        if let Some(mut set) = self.by_status.get_mut(&meta.status) {
1406            set.remove(&node_id);
1407        }
1408
1409        // Tier index
1410        if let Some(mut set) = self.by_tier.get_mut(&meta.topology.tier) {
1411            set.remove(&node_id);
1412        }
1413
1414        // Continent index
1415        if let Some(ref loc) = meta.location {
1416            if let Some(mut set) = self.by_continent.get_mut(loc.region.continent()) {
1417                set.remove(&node_id);
1418            }
1419        }
1420
1421        // Tag index
1422        for tag in &meta.tags {
1423            if let Some(mut set) = self.by_tag.get_mut(tag) {
1424                set.remove(&node_id);
1425            }
1426        }
1427
1428        // Role index
1429        for role in &meta.roles {
1430            if let Some(mut set) = self.by_role.get_mut(role) {
1431                set.remove(&node_id);
1432            }
1433        }
1434
1435        // Owner index
1436        if let Some(ref owner) = meta.owner {
1437            if let Some(mut set) = self.by_owner.get_mut(owner) {
1438                set.remove(&node_id);
1439            }
1440        }
1441    }
1442}
1443
1444impl Default for MetadataStore {
1445    fn default() -> Self {
1446        Self::new()
1447    }
1448}
1449
1450#[cfg(test)]
1451mod tests {
1452    use super::*;
1453
1454    fn make_node_id(n: u8) -> NodeId {
1455        let mut id = [0u8; 32];
1456        id[0] = n;
1457        id
1458    }
1459
1460    #[test]
1461    fn test_location_distance() {
1462        // New York
1463        let ny = LocationInfo::new(Region::NorthAmerica("us-east".into()))
1464            .with_coordinates(40.7128, -74.0060);
1465
1466        // Los Angeles
1467        let la = LocationInfo::new(Region::NorthAmerica("us-west".into()))
1468            .with_coordinates(34.0522, -118.2437);
1469
1470        // London
1471        let london =
1472            LocationInfo::new(Region::Europe("uk".into())).with_coordinates(51.5074, -0.1278);
1473
1474        let ny_la = ny.distance_to(&la).unwrap();
1475        assert!(ny_la > 3900.0 && ny_la < 4000.0, "NY-LA: {}", ny_la);
1476
1477        let ny_london = ny.distance_to(&london).unwrap();
1478        assert!(
1479            ny_london > 5500.0 && ny_london < 5600.0,
1480            "NY-London: {}",
1481            ny_london
1482        );
1483
1484        assert!(ny.same_continent(&la));
1485        assert!(!ny.same_continent(&london));
1486    }
1487
1488    #[test]
1489    fn test_nat_connectivity() {
1490        assert!(NatType::None.can_connect_direct(&NatType::Symmetric));
1491        assert!(NatType::FullCone.can_connect_direct(&NatType::RestrictedCone));
1492        assert!(!NatType::Symmetric.can_connect_direct(&NatType::Symmetric));
1493    }
1494
1495    #[test]
1496    fn test_topology_score() {
1497        let basic = TopologyHints::new(NetworkTier::Consumer);
1498        let premium = TopologyHints::new(NetworkTier::Premium)
1499            .with_bandwidth(1000, 1000)
1500            .with_nat(NatType::None)
1501            .with_relay(100);
1502
1503        assert!(premium.connectivity_score() > basic.connectivity_score());
1504    }
1505
1506    #[test]
1507    fn test_node_metadata() {
1508        let node = NodeMetadata::new(make_node_id(1))
1509            .with_name("test-node")
1510            .with_owner("test-org")
1511            .with_tag("gpu")
1512            .with_role("worker")
1513            .with_status(NodeStatus::Online);
1514
1515        assert_eq!(node.name, Some("test-node".into()));
1516        assert!(node.tags.contains("gpu"));
1517        assert!(node.roles.contains("worker"));
1518        assert!(node.status.accepts_work());
1519    }
1520
1521    #[test]
1522    fn test_metadata_store_basic() {
1523        let store = MetadataStore::new();
1524
1525        let node1 = NodeMetadata::new(make_node_id(1))
1526            .with_name("node1")
1527            .with_status(NodeStatus::Online);
1528
1529        let node2 = NodeMetadata::new(make_node_id(2))
1530            .with_name("node2")
1531            .with_status(NodeStatus::Degraded);
1532
1533        store.upsert(node1).unwrap();
1534        store.upsert(node2).unwrap();
1535
1536        assert_eq!(store.len(), 2);
1537
1538        let retrieved = store.get(&make_node_id(1)).unwrap();
1539        assert_eq!(retrieved.name, Some("node1".into()));
1540
1541        store.remove(&make_node_id(1));
1542        assert_eq!(store.len(), 1);
1543        assert!(store.get(&make_node_id(1)).is_none());
1544    }
1545
1546    #[test]
1547    fn test_metadata_query() {
1548        let store = MetadataStore::new();
1549
1550        // Add nodes with different properties
1551        for i in 0..10 {
1552            let status = if i < 5 {
1553                NodeStatus::Online
1554            } else {
1555                NodeStatus::Degraded
1556            };
1557            let tier = if i < 3 {
1558                NetworkTier::Core
1559            } else {
1560                NetworkTier::Consumer
1561            };
1562
1563            let mut node = NodeMetadata::new(make_node_id(i))
1564                .with_status(status)
1565                .with_topology(TopologyHints::new(tier));
1566
1567            if i % 2 == 0 {
1568                node.tags.insert("even".into());
1569            }
1570
1571            store.upsert(node).unwrap();
1572        }
1573
1574        // Query by status
1575        let online = store.query(&MetadataQuery::new().with_status(NodeStatus::Online));
1576        assert_eq!(online.len(), 5);
1577
1578        // Query by tier
1579        let core = store.query(&MetadataQuery::new().with_min_tier(NetworkTier::Core));
1580        assert_eq!(core.len(), 3);
1581
1582        // Query accepting work
1583        let working = store.query(&MetadataQuery::new().accepting_work());
1584        assert_eq!(working.len(), 10); // Both Online and Degraded accept work
1585
1586        // Query with limit
1587        let limited = store.query(&MetadataQuery::new().with_limit(3));
1588        assert_eq!(limited.len(), 3);
1589    }
1590
1591    #[test]
1592    fn test_find_nearby() {
1593        let store = MetadataStore::new();
1594
1595        // Add nodes at different locations
1596        let locations = [
1597            (40.7128, -74.0060),  // NY
1598            (34.0522, -118.2437), // LA
1599            (51.5074, -0.1278),   // London
1600        ];
1601
1602        for (i, (lat, lon)) in locations.iter().enumerate() {
1603            let node = NodeMetadata::new(make_node_id(i as u8))
1604                .with_location(
1605                    LocationInfo::new(Region::NorthAmerica("test".into()))
1606                        .with_coordinates(*lat, *lon),
1607                )
1608                .with_status(NodeStatus::Online);
1609            store.upsert(node).unwrap();
1610        }
1611
1612        // Find nodes near NY
1613        let ny = LocationInfo::new(Region::NorthAmerica("test".into()))
1614            .with_coordinates(40.7128, -74.0060);
1615
1616        let nearby = store.find_nearby(&ny, 100.0, 10);
1617        assert_eq!(nearby.len(), 1); // Only NY itself is within 100km
1618
1619        let nearby = store.find_nearby(&ny, 5000.0, 10);
1620        assert_eq!(nearby.len(), 2); // NY and LA within 5000km
1621
1622        let nearby = store.find_nearby(&ny, 10000.0, 10);
1623        assert_eq!(nearby.len(), 3); // All within 10000km
1624    }
1625
1626    #[test]
1627    fn test_find_relays() {
1628        let store = MetadataStore::new();
1629
1630        let relay_node = NodeMetadata::new(make_node_id(1))
1631            .with_topology(TopologyHints::new(NetworkTier::Datacenter).with_relay(100))
1632            .with_status(NodeStatus::Online);
1633
1634        let normal_node = NodeMetadata::new(make_node_id(2))
1635            .with_topology(TopologyHints::new(NetworkTier::Consumer))
1636            .with_status(NodeStatus::Online);
1637
1638        store.upsert(relay_node).unwrap();
1639        store.upsert(normal_node).unwrap();
1640
1641        let relays = store.find_relays();
1642        assert_eq!(relays.len(), 1);
1643    }
1644
1645    #[test]
1646    fn test_version_conflict() {
1647        let store = MetadataStore::new();
1648
1649        let node = NodeMetadata::new(make_node_id(1));
1650        store.upsert(node.clone()).unwrap();
1651
1652        // Try to update with wrong version
1653        let result = store.update_versioned(node.clone(), 999);
1654        assert!(matches!(result, Err(MetadataError::VersionConflict { .. })));
1655
1656        // Update with correct version
1657        let result = store.update_versioned(node, 1);
1658        assert!(result.is_ok());
1659    }
1660
1661    #[test]
1662    fn test_capacity_limit() {
1663        let store = MetadataStore::with_capacity(2);
1664
1665        store.upsert(NodeMetadata::new(make_node_id(1))).unwrap();
1666        store.upsert(NodeMetadata::new(make_node_id(2))).unwrap();
1667
1668        let result = store.upsert(NodeMetadata::new(make_node_id(3)));
1669        assert!(matches!(result, Err(MetadataError::CapacityExceeded)));
1670
1671        // Can still update existing
1672        store.upsert(NodeMetadata::new(make_node_id(1))).unwrap();
1673    }
1674
1675    /// CR-30: pin the invariant that every `Arc<NodeMetadata>`
1676    /// returned from a read path (`get`, `query`, `find_nearby`,
1677    /// `best_for_routing`) satisfies [`NodeMetadata::validate_bounds`].
1678    /// Pre-CR-30 the bounds check ran only on `upsert` /
1679    /// `update_versioned`; if a future refactor adds a write path
1680    /// that bypasses both (e.g. snapshot restore that deserializes
1681    /// raw `NodeMetadata` and inserts it into `nodes` directly), an
1682    /// over-bounded entry could leak into reads. This test pins
1683    /// the read-side contract so a future maintainer either
1684    /// honours it on every new write path OR has to update the
1685    /// test.
1686    #[test]
1687    fn cr30_read_path_invariant_every_returned_node_passes_validate_bounds() {
1688        let store = MetadataStore::new();
1689        let mut node = NodeMetadata::new(make_node_id(1));
1690        node.tags.insert("training".into());
1691        store.upsert(node).unwrap();
1692
1693        // Read via `get`: the returned Arc must validate cleanly.
1694        let got = store.get(&make_node_id(1)).expect("inserted node");
1695        got.validate_bounds().expect(
1696            "CR-30: every node returned from MetadataStore::get MUST satisfy \
1697             validate_bounds. If this fires, a write path is bypassing \
1698             upsert's bound check.",
1699        );
1700
1701        // Read via `query`: same invariant.
1702        let q = MetadataQuery::new();
1703        for entry in store.query(&q) {
1704            entry.validate_bounds().expect(
1705                "CR-30: every node returned from MetadataStore::query MUST \
1706                 satisfy validate_bounds.",
1707            );
1708        }
1709    }
1710
1711    /// CR-18: pin the soft-cap race window. The capacity check sits
1712    /// outside the `DashMap::entry` write lock (cannot move it inside
1713    /// without `nodes.len()` self-deadlocking), so two concurrent
1714    /// upserts of distinct `node_id`s can both pass the
1715    /// `nodes.len() >= max` check and both insert past the cap. This
1716    /// is documented as acceptable behavior — `max_capacity` is a
1717    /// best-effort target, not a hard cap. The test intentionally
1718    /// exercises the race and pins that the `len()` may transiently
1719    /// exceed `max` so a future "fix" that turns this into a hard
1720    /// cap also has to update this test.
1721    ///
1722    /// (Mirrors the pattern in `TokenCache::insert_unchecked` and
1723    /// `ContextStore::create_context`.)
1724    #[test]
1725    fn cr18_capacity_check_is_a_soft_cap_under_concurrent_upserts() {
1726        use std::sync::Arc;
1727        use std::thread;
1728
1729        const MAX: usize = 4;
1730        const N_THREADS: usize = 16;
1731
1732        let store = Arc::new(MetadataStore::with_capacity(MAX));
1733        let barrier = Arc::new(std::sync::Barrier::new(N_THREADS));
1734
1735        let mut handles = Vec::with_capacity(N_THREADS);
1736        for t in 0..N_THREADS {
1737            let store = Arc::clone(&store);
1738            let barrier = Arc::clone(&barrier);
1739            handles.push(thread::spawn(move || {
1740                barrier.wait();
1741                let id = make_node_id(t as u8 + 1);
1742                store.upsert(NodeMetadata::new(id))
1743            }));
1744        }
1745        let mut accepted = 0usize;
1746        let mut rejected = 0usize;
1747        for h in handles {
1748            match h.join().unwrap() {
1749                Ok(()) => accepted += 1,
1750                Err(MetadataError::CapacityExceeded) => rejected += 1,
1751                Err(other) => panic!("unexpected upsert error: {other:?}"),
1752            }
1753        }
1754
1755        // CR-18: pin that at LEAST `MAX` upserts succeeded. The
1756        // total `accepted` may be > MAX under heavy concurrency
1757        // because the soft-cap check loses the race against
1758        // multiple concurrent inserters of distinct ids — that's
1759        // the documented behavior. Pre-CR-18 the docs didn't
1760        // call this out at the public-API level, so a caller
1761        // reading `with_capacity(N)` might assume a hard ceiling.
1762        assert!(
1763            accepted >= MAX,
1764            "at least the cap's worth must succeed; got {accepted}"
1765        );
1766        assert!(
1767            accepted + rejected == N_THREADS,
1768            "every upsert must surface either Ok or CapacityExceeded"
1769        );
1770        // The store SIZE may transiently equal `accepted`, which
1771        // can exceed MAX. Pin THAT property so the soft-cap
1772        // semantic is documented in code, not just docs.
1773        assert!(
1774            store.nodes.len() <= accepted,
1775            "store size must not exceed the count of successful upserts"
1776        );
1777        // If accepted > MAX, the soft-cap was crossed — that's
1778        // the documented limitation. Just confirm nothing's torn.
1779    }
1780
1781    // ========================================================================
1782    // NodeMetadata bounds must be enforced before indexing
1783    // ========================================================================
1784
1785    /// Oversized tag set is rejected by `upsert` before touching the
1786    /// index DashMaps. Pre-fix a peer could ship a single
1787    /// announcement with millions of unique tags and turn it into
1788    /// millions of `by_tag` entries.
1789    #[test]
1790    fn upsert_rejects_oversized_tags() {
1791        let store = MetadataStore::new();
1792        let mut node = NodeMetadata::new(make_node_id(1));
1793        for i in 0..(MAX_METADATA_TAGS + 1) {
1794            node.tags.insert(format!("t{}", i));
1795        }
1796        let result = store.upsert(node);
1797        assert!(
1798            matches!(result, Err(MetadataError::Invalid(_))),
1799            "oversized tags must surface as MetadataError::Invalid, got {:?}",
1800            result,
1801        );
1802    }
1803
1804    /// Oversized custom-map is rejected.
1805    #[test]
1806    fn upsert_rejects_oversized_custom_map() {
1807        let store = MetadataStore::new();
1808        let mut node = NodeMetadata::new(make_node_id(2));
1809        for i in 0..(MAX_METADATA_CUSTOM_ENTRIES + 1) {
1810            node.custom.insert(format!("k{}", i), "v".to_string());
1811        }
1812        assert!(matches!(store.upsert(node), Err(MetadataError::Invalid(_))));
1813    }
1814
1815    /// A single string field over the per-string cap is rejected.
1816    #[test]
1817    fn upsert_rejects_oversized_string_fields() {
1818        let store = MetadataStore::new();
1819        let huge = "x".repeat(MAX_METADATA_STRING_LEN + 1);
1820        let node = NodeMetadata::new(make_node_id(3)).with_name(huge);
1821        assert!(matches!(store.upsert(node), Err(MetadataError::Invalid(_))));
1822    }
1823
1824    /// Metadata at exactly the boundaries is accepted — pins the
1825    /// `<=` semantics so a future tightening to strict `<` doesn't
1826    /// silently break legitimate-but-large announcements.
1827    #[test]
1828    fn upsert_accepts_metadata_at_exact_boundaries() {
1829        let store = MetadataStore::new();
1830        let mut node = NodeMetadata::new(make_node_id(4));
1831        for i in 0..MAX_METADATA_TAGS {
1832            node.tags.insert(format!("t{}", i));
1833        }
1834        let name_at_cap = "x".repeat(MAX_METADATA_STRING_LEN);
1835        node = node.with_name(name_at_cap);
1836        store
1837            .upsert(node)
1838            .expect("metadata at the exact boundaries must be accepted");
1839    }
1840
1841    // ========================================================================
1842    // upsert must serialize concurrent writers on the same node_id
1843    // ========================================================================
1844
1845    /// Concurrent `upsert`s on the same `node_id` with DIFFERENT
1846    /// metadata must leave the inverted indexes consistent —
1847    /// exactly one (status, tag) pairing per node, matching the
1848    /// final stored metadata. Pre-fix the read-old happened
1849    /// outside any lock, so two threads could observe the same
1850    /// `old`, both `remove_from_indexes(old)`, both
1851    /// `add_to_indexes(new_a)` / `add_to_indexes(new_b)` into
1852    /// different buckets, and the loser's index entries leaked
1853    /// permanently.
1854    #[test]
1855    fn upsert_serializes_concurrent_writes_on_same_node_id() {
1856        use std::sync::Arc;
1857        use std::thread;
1858
1859        let store = Arc::new(MetadataStore::new());
1860        let node_id = make_node_id(42);
1861
1862        // Seed the store so both threads see an existing entry to
1863        // remove from indexes — the original race vector.
1864        let seed = NodeMetadata::new(node_id)
1865            .with_status(NodeStatus::Online)
1866            .with_tag("seed")
1867            .with_topology(TopologyHints::new(NetworkTier::Consumer));
1868        store.upsert(seed).unwrap();
1869
1870        // Two threads upsert different metadata on the same id.
1871        // Each contends with the other for the node's shard write
1872        // guard.
1873        let n_iters = 50;
1874        let store_a = store.clone();
1875        let store_b = store.clone();
1876        let h_a = thread::spawn(move || {
1877            for i in 0..n_iters {
1878                let node = NodeMetadata::new(node_id)
1879                    .with_status(NodeStatus::Online)
1880                    .with_tag(format!("a-{}", i))
1881                    .with_topology(TopologyHints::new(NetworkTier::Premium));
1882                store_a.upsert(node).unwrap();
1883            }
1884        });
1885        let h_b = thread::spawn(move || {
1886            for i in 0..n_iters {
1887                let node = NodeMetadata::new(node_id)
1888                    .with_status(NodeStatus::Degraded)
1889                    .with_tag(format!("b-{}", i))
1890                    .with_topology(TopologyHints::new(NetworkTier::Datacenter));
1891                store_b.upsert(node).unwrap();
1892            }
1893        });
1894        h_a.join().unwrap();
1895        h_b.join().unwrap();
1896
1897        // Final metadata: one of the two threads' last write
1898        // landed last. Whatever it is, the inverted indexes must
1899        // reflect ONLY that final metadata — no leftover
1900        // status/tag entries from the other thread.
1901        let final_meta = store.get(&node_id).expect("node must still exist");
1902        let final_status = final_meta.status;
1903        let final_tags: std::collections::HashSet<&str> =
1904            final_meta.tags.iter().map(|s| s.as_str()).collect();
1905
1906        // No status bucket OTHER than the final one may contain
1907        // this node_id.
1908        for status in [
1909            NodeStatus::Online,
1910            NodeStatus::Offline,
1911            NodeStatus::Degraded,
1912            NodeStatus::Starting,
1913            NodeStatus::Maintenance,
1914        ] {
1915            let bucket_has_node = store
1916                .by_status
1917                .get(&status)
1918                .map(|s| s.contains(&node_id))
1919                .unwrap_or(false);
1920            if status == final_status {
1921                assert!(
1922                    bucket_has_node,
1923                    "final status {:?} bucket must contain the node",
1924                    status
1925                );
1926            } else {
1927                assert!(
1928                    !bucket_has_node,
1929                    "stale status {:?} bucket must NOT contain the node",
1930                    status
1931                );
1932            }
1933        }
1934
1935        // No tag bucket OTHER than the final tags may contain
1936        // this node_id. Walk every tag bucket the threads might
1937        // have touched.
1938        for i in 0..n_iters {
1939            for prefix in ["a-", "b-"] {
1940                let tag = format!("{}{}", prefix, i);
1941                if final_tags.contains(tag.as_str()) {
1942                    continue;
1943                }
1944                let bucket_has_node = store
1945                    .by_tag
1946                    .get(&tag)
1947                    .map(|s| s.contains(&node_id))
1948                    .unwrap_or(false);
1949                assert!(
1950                    !bucket_has_node,
1951                    "stale tag '{}' bucket must NOT contain the node",
1952                    tag
1953                );
1954            }
1955        }
1956        // The seed tag must also be gone.
1957        let seed_bucket_has_node = store
1958            .by_tag
1959            .get("seed")
1960            .map(|s| s.contains(&node_id))
1961            .unwrap_or(false);
1962        assert!(
1963            !seed_bucket_has_node,
1964            "the original seed tag must have been removed"
1965        );
1966    }
1967
1968    #[test]
1969    fn test_stats() {
1970        let store = MetadataStore::new();
1971
1972        for i in 0..5 {
1973            let node = NodeMetadata::new(make_node_id(i))
1974                .with_status(if i < 3 {
1975                    NodeStatus::Online
1976                } else {
1977                    NodeStatus::Offline
1978                })
1979                .with_topology(TopologyHints::new(NetworkTier::Consumer));
1980            store.upsert(node).unwrap();
1981        }
1982
1983        // Perform some queries
1984        store.query(&MetadataQuery::new());
1985        store.query(&MetadataQuery::new());
1986
1987        let stats = store.stats();
1988        assert_eq!(stats.total_nodes, 5);
1989        assert_eq!(stats.by_status.get(&NodeStatus::Online), Some(&3));
1990        assert_eq!(stats.by_status.get(&NodeStatus::Offline), Some(&2));
1991        assert_eq!(stats.queries, 2);
1992        assert_eq!(stats.updates, 5);
1993    }
1994
1995    /// `stats()` is now computed from the inverted indexes and
1996    /// `total_nodes` / `len()` from an O(1) counter. Verify both stay
1997    /// exact across upsert, status change, remove (which can leave an
1998    /// empty index bucket — that bucket must NOT surface as a 0 count),
1999    /// and clear.
2000    #[test]
2001    fn stats_and_len_track_inserts_removes_and_status_changes() {
2002        let store = MetadataStore::new();
2003        for i in 0..4u8 {
2004            store
2005                .upsert(NodeMetadata::new(make_node_id(i)).with_status(NodeStatus::Online))
2006                .unwrap();
2007        }
2008        assert_eq!(store.len(), 4);
2009        assert_eq!(store.stats().total_nodes, 4);
2010        assert_eq!(store.stats().by_status.get(&NodeStatus::Online), Some(&4));
2011
2012        // Re-upsert (same id, new status) must not grow len, and must move
2013        // the node between status buckets.
2014        store
2015            .upsert(NodeMetadata::new(make_node_id(0)).with_status(NodeStatus::Offline))
2016            .unwrap();
2017        assert_eq!(store.len(), 4, "re-upsert must not change node count");
2018        let s = store.stats();
2019        assert_eq!(s.by_status.get(&NodeStatus::Online), Some(&3));
2020        assert_eq!(s.by_status.get(&NodeStatus::Offline), Some(&1));
2021
2022        // Remove the only Offline node — its bucket empties and must drop
2023        // out of the histogram entirely (not report 0).
2024        store.remove(&make_node_id(0));
2025        assert_eq!(store.len(), 3);
2026        let s = store.stats();
2027        assert_eq!(s.total_nodes, 3);
2028        assert_eq!(
2029            s.by_status.get(&NodeStatus::Offline),
2030            None,
2031            "emptied status bucket must not appear in stats"
2032        );
2033
2034        store.clear();
2035        assert_eq!(store.len(), 0);
2036        assert!(store.is_empty());
2037        assert_eq!(store.stats().total_nodes, 0);
2038        assert!(store.stats().by_status.is_empty());
2039    }
2040
2041    // ========================================================================
2042    // Cubic-ai P2: validate_bounds must walk nested structs, not just
2043    // top-level fields and collection counts
2044    // ========================================================================
2045
2046    /// `validate_bounds` rejects metadata whose nested
2047    /// `LocationInfo::provider` exceeds `MAX_METADATA_STRING_LEN`.
2048    /// Pre-fix only top-level strings (`name`, `description`,
2049    /// `owner`) and collection counts were checked, so a peer could
2050    /// stuff arbitrary multi-megabyte data into `location.provider`
2051    /// (or any other LocationInfo string) and slip past every guard.
2052    #[test]
2053    fn validate_bounds_rejects_oversized_location_string() {
2054        let mut node = NodeMetadata::new(make_node_id(1));
2055        node.location = Some(LocationInfo {
2056            region: Region::NorthAmerica("us-east".into()),
2057            zone: None,
2058            latitude: None,
2059            longitude: None,
2060            asn: None,
2061            provider: Some("p".repeat(MAX_METADATA_STRING_LEN + 1)),
2062            datacenter: None,
2063            country_code: None,
2064            city: None,
2065        });
2066
2067        match node.validate_bounds() {
2068            Err(MetadataError::Invalid(msg)) => {
2069                assert!(
2070                    msg.contains("location.provider"),
2071                    "rejection must name the offending field; got: {msg}",
2072                );
2073            }
2074            other => panic!("expected Invalid for oversized location.provider, got {other:?}"),
2075        }
2076    }
2077
2078    /// `validate_bounds` rejects metadata whose `hop_distances` contains
2079    /// a key longer than `MAX_METADATA_STRING_LEN`. Pre-fix only the
2080    /// map cardinality was capped — a single oversized key inside an
2081    /// otherwise small map smuggled the check.
2082    #[test]
2083    fn validate_bounds_rejects_oversized_hop_distances_key() {
2084        let mut topo = TopologyHints::new(NetworkTier::Consumer);
2085        topo.hop_distances
2086            .insert("k".repeat(MAX_METADATA_STRING_LEN + 1), 3);
2087        let node = NodeMetadata::new(make_node_id(1)).with_topology(topo);
2088
2089        match node.validate_bounds() {
2090            Err(MetadataError::Invalid(msg)) => {
2091                assert!(
2092                    msg.contains("hop_distances key"),
2093                    "rejection must name the offending field; got: {msg}",
2094                );
2095            }
2096            other => {
2097                panic!("expected Invalid for oversized hop_distances key, got {other:?}")
2098            }
2099        }
2100    }
2101
2102    /// Belt-and-braces happy path: a complete metadata bundle with
2103    /// every nested field at exactly `MAX_METADATA_STRING_LEN` must
2104    /// pass — the new checks are bound-strict (`>`, not `>=`), so a
2105    /// future tightening that flipped the comparator would lock out
2106    /// legitimate boundary callers and this test would catch it.
2107    #[test]
2108    fn validate_bounds_accepts_at_boundary_lengths() {
2109        let mut node = NodeMetadata::new(make_node_id(1));
2110        node.location = Some(LocationInfo {
2111            region: Region::Europe("uk".into()),
2112            zone: Some("z".repeat(MAX_METADATA_STRING_LEN)),
2113            latitude: None,
2114            longitude: None,
2115            asn: None,
2116            provider: Some("p".repeat(MAX_METADATA_STRING_LEN)),
2117            datacenter: Some("d".repeat(MAX_METADATA_STRING_LEN)),
2118            country_code: Some("c".repeat(MAX_METADATA_STRING_LEN)),
2119            city: Some("y".repeat(MAX_METADATA_STRING_LEN)),
2120        });
2121        let mut topo = TopologyHints::new(NetworkTier::Consumer);
2122        topo.hop_distances
2123            .insert("k".repeat(MAX_METADATA_STRING_LEN), 1);
2124        node.topology = topo;
2125
2126        node.validate_bounds()
2127            .expect("at-boundary nested strings must validate");
2128    }
2129
2130    // ---------- Pure-function coverage ----------
2131    //
2132    // Existing tests construct nodes and run queries but never
2133    // call these accessors. Each is a tiny pure function; the
2134    // risk if they regress is silent (e.g., a routing scheduler
2135    // suddenly preferring Draining over Online).
2136
2137    #[test]
2138    fn region_zone_returns_inner_zone_for_every_variant() {
2139        assert_eq!(Region::NorthAmerica("us-east".into()).zone(), "us-east");
2140        assert_eq!(Region::SouthAmerica("br-sp".into()).zone(), "br-sp");
2141        assert_eq!(Region::Europe("eu-west".into()).zone(), "eu-west");
2142        assert_eq!(Region::AsiaPacific("ap-1".into()).zone(), "ap-1");
2143        assert_eq!(Region::MiddleEast("me-1".into()).zone(), "me-1");
2144        assert_eq!(Region::Africa("af-1".into()).zone(), "af-1");
2145        assert_eq!(Region::Custom("custom-z".into()).zone(), "custom-z");
2146    }
2147
2148    #[test]
2149    fn node_status_routing_priority_orders_variants_correctly() {
2150        // Online must outrank everything; Offline/ShuttingDown/
2151        // Maintenance must be the lowest tier. A regression that
2152        // swaps any pair here would silently mis-route traffic.
2153        assert!(NodeStatus::Online.routing_priority() > NodeStatus::Degraded.routing_priority());
2154        assert!(NodeStatus::Degraded.routing_priority() > NodeStatus::Starting.routing_priority());
2155        assert!(NodeStatus::Starting.routing_priority() > NodeStatus::Draining.routing_priority());
2156        assert_eq!(NodeStatus::Offline.routing_priority(), 0);
2157        assert_eq!(NodeStatus::ShuttingDown.routing_priority(), 0);
2158        assert_eq!(NodeStatus::Maintenance.routing_priority(), 0);
2159        assert_eq!(NodeStatus::Online.routing_priority(), 5);
2160    }
2161
2162    #[test]
2163    fn average_latency_handles_empty_and_populated() {
2164        let mut topo = TopologyHints::new(NetworkTier::Consumer);
2165        assert!(topo.average_latency().is_none(), "empty must be None");
2166
2167        topo.update_latency(make_node_id(1), 100);
2168        topo.update_latency(make_node_id(2), 200);
2169        topo.update_latency(make_node_id(3), 300);
2170        assert_eq!(topo.average_latency(), Some(200.0));
2171    }
2172
2173    #[test]
2174    fn touch_advances_version_and_updated_at() {
2175        let mut node = NodeMetadata::new(make_node_id(1));
2176        let v0 = node.version;
2177        let t0 = node.updated_at;
2178        // Tiny sleep so wall-clock ms tick — `updated_at` is ms-resolution.
2179        std::thread::sleep(std::time::Duration::from_millis(5));
2180        node.touch();
2181        assert_eq!(node.version, v0 + 1);
2182        assert!(
2183            node.updated_at >= t0,
2184            "updated_at must not go backward (got {} from {t0})",
2185            node.updated_at,
2186        );
2187    }
2188
2189    #[test]
2190    fn is_stale_compares_age_against_max_age() {
2191        let mut node = NodeMetadata::new(make_node_id(1));
2192        // Force updated_at into the past so age() reports a known
2193        // gap regardless of wall-clock drift on the test host.
2194        node.updated_at = node.updated_at.saturating_sub(60_000);
2195        assert!(node.is_stale(Duration::from_secs(10)));
2196        assert!(!node.is_stale(Duration::from_secs(3600)));
2197    }
2198
2199    // ---------- validate_bounds error branches ----------
2200
2201    #[test]
2202    fn validate_bounds_rejects_oversized_owner() {
2203        let mut node = NodeMetadata::new(make_node_id(1));
2204        node.owner = Some("o".repeat(MAX_METADATA_STRING_LEN + 1));
2205        let err = node.validate_bounds().unwrap_err();
2206        let msg = format!("{}", err);
2207        assert!(msg.contains("owner"), "error must name 'owner': {msg}");
2208    }
2209
2210    #[test]
2211    fn validate_bounds_rejects_too_many_tags() {
2212        let mut node = NodeMetadata::new(make_node_id(1));
2213        for i in 0..MAX_METADATA_TAGS + 1 {
2214            node.tags.insert(format!("t{i}"));
2215        }
2216        assert!(matches!(
2217            node.validate_bounds(),
2218            Err(MetadataError::Invalid(_))
2219        ));
2220    }
2221
2222    #[test]
2223    fn validate_bounds_rejects_oversized_role() {
2224        let mut node = NodeMetadata::new(make_node_id(1));
2225        node.roles.insert("r".repeat(MAX_METADATA_STRING_LEN + 1));
2226        let err = node.validate_bounds().unwrap_err();
2227        assert!(format!("{}", err).contains("role"));
2228    }
2229
2230    #[test]
2231    fn validate_bounds_rejects_oversized_custom_map_entries() {
2232        let mut node = NodeMetadata::new(make_node_id(1));
2233
2234        // Oversized key.
2235        node.custom
2236            .insert("k".repeat(MAX_METADATA_STRING_LEN + 1), "v".into());
2237        let err = node.validate_bounds().unwrap_err();
2238        assert!(format!("{}", err).contains("custom key"));
2239
2240        // Oversized value.
2241        node.custom.clear();
2242        node.custom
2243            .insert("k".into(), "v".repeat(MAX_METADATA_STRING_LEN + 1));
2244        let err = node.validate_bounds().unwrap_err();
2245        assert!(format!("{}", err).contains("custom value"));
2246    }
2247
2248    #[test]
2249    fn validate_bounds_rejects_too_many_custom_entries() {
2250        let mut node = NodeMetadata::new(make_node_id(1));
2251        for i in 0..MAX_METADATA_CUSTOM_ENTRIES + 1 {
2252            node.custom.insert(format!("k{i}"), "v".into());
2253        }
2254        assert!(matches!(
2255            node.validate_bounds(),
2256            Err(MetadataError::Invalid(_))
2257        ));
2258    }
2259}