Skip to main content

clawft_kernel/
cluster.rs

1//! Cluster membership and node fabric.
2//!
3//! Defines types for multi-node WeftOS clusters where agents
4//! can migrate between nodes. Each node is a WeftOS kernel
5//! instance -- native binary on cloud/edge, or WASM in a browser.
6//!
7//! # Feature Gate
8//!
9//! All types compile unconditionally. Actual peer discovery,
10//! health monitoring, and cross-node communication require the
11//! `cluster` feature flag and a distributed networking layer.
12
13use std::collections::HashMap;
14use std::time::Instant;
15
16use chrono::{DateTime, Utc};
17use dashmap::DashMap;
18use serde::{Deserialize, Serialize};
19use tracing::{debug, warn};
20
21/// Unique node identifier (UUID or DID string).
22pub type NodeId = String;
23
24/// Node platform type.
25#[non_exhaustive]
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub enum NodePlatform {
28    /// Native binary on a cloud VM or bare metal server.
29    CloudNative,
30    /// Native binary on an edge device.
31    Edge,
32    /// WASM module running in a browser tab.
33    Browser,
34    /// WASI module in a container.
35    Wasi,
36    /// Custom platform label.
37    Custom(String),
38}
39
40impl std::fmt::Display for NodePlatform {
41    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42        match self {
43            NodePlatform::CloudNative => write!(f, "cloud-native"),
44            NodePlatform::Edge => write!(f, "edge"),
45            NodePlatform::Browser => write!(f, "browser"),
46            NodePlatform::Wasi => write!(f, "wasi"),
47            NodePlatform::Custom(name) => write!(f, "custom({name})"),
48        }
49    }
50}
51
52/// Node health state.
53#[non_exhaustive]
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
55pub enum NodeState {
56    /// Node is joining the cluster.
57    Joining,
58    /// Node is healthy and active.
59    Active,
60    /// Node is suspected unreachable (missed heartbeats).
61    Suspect,
62    /// Node has been confirmed unreachable.
63    Unreachable,
64    /// Node is gracefully leaving the cluster.
65    Leaving,
66    /// Node has left the cluster.
67    Left,
68}
69
70impl std::fmt::Display for NodeState {
71    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72        match self {
73            NodeState::Joining => write!(f, "joining"),
74            NodeState::Active => write!(f, "active"),
75            NodeState::Suspect => write!(f, "suspect"),
76            NodeState::Unreachable => write!(f, "unreachable"),
77            NodeState::Leaving => write!(f, "leaving"),
78            NodeState::Left => write!(f, "left"),
79        }
80    }
81}
82
83/// Information about a peer node in the cluster.
84#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct PeerNode {
86    /// Unique node identifier.
87    pub id: NodeId,
88
89    /// Human-readable node name.
90    pub name: String,
91
92    /// Node platform.
93    pub platform: NodePlatform,
94
95    /// Current state in the cluster.
96    pub state: NodeState,
97
98    /// Network address for direct communication.
99    #[serde(default, skip_serializing_if = "Option::is_none")]
100    pub address: Option<String>,
101
102    /// When this node was first seen.
103    pub first_seen: DateTime<Utc>,
104
105    /// When the last heartbeat was received.
106    pub last_heartbeat: DateTime<Utc>,
107
108    /// Capabilities this node advertises.
109    #[serde(default)]
110    pub capabilities: Vec<String>,
111
112    /// Labels for scheduling and filtering.
113    #[serde(default)]
114    pub labels: HashMap<String, String>,
115}
116
117/// Cluster configuration.
118#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct ClusterConfig {
120    /// This node's identifier.
121    pub node_id: NodeId,
122
123    /// This node's display name.
124    #[serde(default = "default_node_name")]
125    pub node_name: String,
126
127    /// This node's platform type.
128    #[serde(default = "default_platform")]
129    pub platform: NodePlatform,
130
131    /// Heartbeat interval in seconds.
132    #[serde(default = "default_heartbeat_interval")]
133    pub heartbeat_interval_secs: u64,
134
135    /// How many missed heartbeats before marking a node suspect.
136    #[serde(default = "default_suspect_threshold")]
137    pub suspect_threshold: u32,
138
139    /// How many missed heartbeats before marking a node unreachable.
140    #[serde(default = "default_unreachable_threshold")]
141    pub unreachable_threshold: u32,
142
143    /// Maximum cluster size (0 = unlimited).
144    #[serde(default)]
145    pub max_nodes: u32,
146
147    /// Address to bind the mesh listener (e.g., "0.0.0.0:9470").
148    #[serde(default)]
149    pub bind_address: Option<String>,
150
151    /// Seed peers for bootstrap discovery.
152    #[serde(default)]
153    pub seed_peers: Vec<String>,
154
155    /// Path to Ed25519 identity key file.
156    #[serde(default)]
157    pub identity_key_path: Option<std::path::PathBuf>,
158}
159
160fn default_node_name() -> String {
161    "local".into()
162}
163
164fn default_platform() -> NodePlatform {
165    NodePlatform::CloudNative
166}
167
168fn default_heartbeat_interval() -> u64 {
169    5
170}
171
172fn default_suspect_threshold() -> u32 {
173    3
174}
175
176fn default_unreachable_threshold() -> u32 {
177    10
178}
179
180impl Default for ClusterConfig {
181    fn default() -> Self {
182        Self {
183            node_id: uuid::Uuid::new_v4().to_string(),
184            node_name: default_node_name(),
185            platform: default_platform(),
186            heartbeat_interval_secs: default_heartbeat_interval(),
187            suspect_threshold: default_suspect_threshold(),
188            unreachable_threshold: default_unreachable_threshold(),
189            max_nodes: 0,
190            bind_address: None,
191            seed_peers: Vec::new(),
192            identity_key_path: None,
193        }
194    }
195}
196
197// ── ECC capability advertisement (K3c) ────────────────────────────
198
199/// ECC capabilities advertised by a cluster node.
200///
201/// Populated during boot-time calibration and advertised to peers
202/// so they can route ECC-related requests to capable nodes.
203#[cfg(feature = "ecc")]
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct NodeEccCapability {
206    /// Calibrated cognitive tick interval (milliseconds).
207    pub tick_interval_ms: u32,
208    /// 95th percentile compute time per tick (microseconds).
209    pub compute_p95_us: u32,
210    /// Headroom ratio (actual_compute / budget).
211    pub headroom_ratio: f32,
212    /// Number of vectors in the HNSW index.
213    pub hnsw_vector_count: u32,
214    /// Number of edges in the causal graph.
215    pub causal_edge_count: u32,
216    /// Whether this node can perform spectral analysis.
217    pub spectral_capable: bool,
218    /// Unix timestamp when calibration was performed.
219    pub calibrated_at: u64,
220}
221
222// ── Node identity (K6 mesh networking) ─────────────────────────────
223
224/// Node identity derived from Ed25519 keypair.
225///
226/// The `node_id` is derived as `hex(SHA-256(pubkey)[0..16])`,
227/// providing a stable, compact identifier tied to the cryptographic key.
228#[cfg(any(feature = "mesh", feature = "exochain"))]
229pub struct NodeIdentity {
230    /// Ed25519 signing key (private).
231    keypair: ed25519_dalek::SigningKey,
232    /// Derived node identifier.
233    node_id: String,
234}
235
236#[cfg(any(feature = "mesh", feature = "exochain"))]
237impl NodeIdentity {
238    /// Generate a new random identity.
239    pub fn generate() -> Self {
240        use sha2::Digest;
241
242        let mut csprng = rand::thread_rng();
243        let keypair = ed25519_dalek::SigningKey::generate(&mut csprng);
244        let pubkey_bytes = keypair.verifying_key().to_bytes();
245
246        let hash = sha2::Sha256::digest(pubkey_bytes);
247        let node_id = hash[..16]
248            .iter()
249            .fold(String::with_capacity(32), |mut s, b| {
250                use std::fmt::Write;
251                let _ = write!(s, "{b:02x}");
252                s
253            });
254
255        Self { keypair, node_id }
256    }
257
258    /// Get this node's identifier.
259    pub fn node_id(&self) -> &str {
260        &self.node_id
261    }
262
263    /// Get the public verification key.
264    pub fn public_key(&self) -> ed25519_dalek::VerifyingKey {
265        self.keypair.verifying_key()
266    }
267
268    /// Sign arbitrary data with this node's private key.
269    pub fn sign(&self, data: &[u8]) -> ed25519_dalek::Signature {
270        use ed25519_dalek::Signer;
271        self.keypair.sign(data)
272    }
273}
274
275/// Cluster membership errors.
276#[non_exhaustive]
277#[derive(Debug, thiserror::Error)]
278pub enum ClusterError {
279    /// Node already in the cluster.
280    #[error("node already exists: '{node_id}'")]
281    NodeAlreadyExists {
282        /// Node ID.
283        node_id: NodeId,
284    },
285
286    /// Node not found.
287    #[error("node not found: '{node_id}'")]
288    NodeNotFound {
289        /// Node ID.
290        node_id: NodeId,
291    },
292
293    /// Cluster is at maximum capacity.
294    #[error("cluster full: max {max} nodes")]
295    ClusterFull {
296        /// Maximum node count.
297        max: u32,
298    },
299
300    /// Mesh networking error.
301    #[error("mesh error: {0}")]
302    Mesh(String),
303
304    /// Authentication failed during cluster join.
305    #[error("authentication failed: {0}")]
306    AuthFailed(String),
307
308    /// Invalid state transition.
309    #[error("invalid node state transition: {from} -> {to}")]
310    InvalidTransition {
311        /// Current state.
312        from: String,
313        /// Requested state.
314        to: String,
315    },
316
317    /// Peer additions are too frequent (rate limited).
318    #[error("rate limited: peer additions too frequent")]
319    RateLimited,
320}
321
322/// Cluster membership tracker.
323///
324/// Tracks which nodes are part of the cluster, their health state,
325/// and capabilities. Actual peer discovery and heartbeat monitoring
326/// require a networking layer not included here.
327/// Known valid capabilities for cluster nodes.
328const KNOWN_CAPABILITIES: &[&str] = &[
329    "ipc", "chain", "tree", "governance", "ecc", "wasm", "containers", "apps",
330    "mesh", "discovery", "heartbeat", "compute",
331];
332
333/// Validate peer capabilities against the known set.
334/// Returns unknown capabilities (if any).
335fn validate_capabilities(capabilities: &[String]) -> Vec<String> {
336    capabilities
337        .iter()
338        .filter(|c| !KNOWN_CAPABILITIES.contains(&c.as_str()))
339        .cloned()
340        .collect()
341}
342
343pub struct ClusterMembership {
344    config: ClusterConfig,
345    peers: DashMap<NodeId, PeerNode>,
346    /// Timestamp of last peer addition (rate limiting).
347    last_peer_add: std::sync::Mutex<Option<Instant>>,
348    /// Minimum interval between peer additions.
349    min_peer_add_interval: std::time::Duration,
350}
351
352impl ClusterMembership {
353    /// Create a new cluster membership tracker.
354    pub fn new(config: ClusterConfig) -> Self {
355        Self {
356            config,
357            peers: DashMap::new(),
358            last_peer_add: std::sync::Mutex::new(None),
359            min_peer_add_interval: std::time::Duration::from_millis(100),
360        }
361    }
362
363    /// Set the minimum interval between peer additions (builder style).
364    pub fn with_min_peer_interval(mut self, interval: std::time::Duration) -> Self {
365        self.min_peer_add_interval = interval;
366        self
367    }
368
369    /// Get the cluster configuration.
370    pub fn config(&self) -> &ClusterConfig {
371        &self.config
372    }
373
374    /// Get this node's ID.
375    pub fn local_node_id(&self) -> &str {
376        &self.config.node_id
377    }
378
379    /// Register a peer node.
380    ///
381    /// Rate-limited to at most one addition per `min_peer_add_interval`
382    /// (default 100 ms) to prevent join-flood attacks.
383    pub fn add_peer(&self, peer: PeerNode) -> Result<(), ClusterError> {
384        // Rate-limit peer additions.
385        {
386            let mut last = self.last_peer_add.lock().unwrap();
387            if let Some(ts) = *last {
388                if ts.elapsed() < self.min_peer_add_interval {
389                    return Err(ClusterError::RateLimited);
390                }
391            }
392            *last = Some(Instant::now());
393        }
394
395        if self.peers.contains_key(&peer.id) {
396            return Err(ClusterError::NodeAlreadyExists { node_id: peer.id });
397        }
398
399        if self.config.max_nodes > 0 && self.peers.len() as u32 >= self.config.max_nodes {
400            return Err(ClusterError::ClusterFull {
401                max: self.config.max_nodes,
402            });
403        }
404
405        // Validate capabilities -- warn on unknown but still allow (forward compat).
406        let unknown = validate_capabilities(&peer.capabilities);
407        if !unknown.is_empty() {
408            warn!(
409                node_id = %peer.id,
410                unknown_capabilities = ?unknown,
411                "peer advertises unknown capabilities"
412            );
413        }
414
415        debug!(node_id = %peer.id, name = %peer.name, "adding peer to cluster");
416        self.peers.insert(peer.id.clone(), peer);
417        Ok(())
418    }
419
420    /// Remove a peer node.
421    pub fn remove_peer(&self, node_id: &str) -> Result<PeerNode, ClusterError> {
422        self.peers
423            .remove(node_id)
424            .map(|(_, peer)| peer)
425            .ok_or_else(|| ClusterError::NodeNotFound {
426                node_id: node_id.to_owned(),
427            })
428    }
429
430    /// Update a peer's state.
431    pub fn update_state(&self, node_id: &str, new_state: NodeState) -> Result<(), ClusterError> {
432        let mut entry = self
433            .peers
434            .get_mut(node_id)
435            .ok_or_else(|| ClusterError::NodeNotFound {
436                node_id: node_id.to_owned(),
437            })?;
438        entry.state = new_state;
439        Ok(())
440    }
441
442    /// Record a heartbeat from a peer.
443    pub fn heartbeat(&self, node_id: &str) -> Result<(), ClusterError> {
444        let mut entry = self
445            .peers
446            .get_mut(node_id)
447            .ok_or_else(|| ClusterError::NodeNotFound {
448                node_id: node_id.to_owned(),
449            })?;
450        entry.last_heartbeat = Utc::now();
451        if entry.state == NodeState::Suspect {
452            entry.state = NodeState::Active;
453        }
454        Ok(())
455    }
456
457    /// Get a snapshot of a peer's state.
458    pub fn get_peer(&self, node_id: &str) -> Option<PeerNode> {
459        self.peers.get(node_id).map(|e| e.value().clone())
460    }
461
462    /// List all peers with their states.
463    pub fn list_peers(&self) -> Vec<(NodeId, NodeState, NodePlatform)> {
464        self.peers
465            .iter()
466            .map(|e| (e.key().clone(), e.state.clone(), e.platform.clone()))
467            .collect()
468    }
469
470    /// Count peers by state.
471    pub fn count_by_state(&self, state: &NodeState) -> usize {
472        self.peers.iter().filter(|e| &e.state == state).count()
473    }
474
475    /// Count total peers.
476    pub fn len(&self) -> usize {
477        self.peers.len()
478    }
479
480    /// Check if cluster has no peers.
481    pub fn is_empty(&self) -> bool {
482        self.peers.is_empty()
483    }
484
485    /// Add a peer and optionally create a resource tree node.
486    #[cfg(feature = "exochain")]
487    pub fn add_peer_with_tree(
488        &self,
489        peer: PeerNode,
490        tree: &std::sync::Mutex<exo_resource_tree::ResourceTree>,
491    ) -> Result<(), ClusterError> {
492        let peer_name = peer.name.clone();
493        self.add_peer(peer)?;
494
495        // Create tree node for this peer
496        let mut tree = tree.lock().unwrap();
497        let peer_id =
498            exo_resource_tree::ResourceId::new(format!("/network/peers/{peer_name}"));
499        let parent = exo_resource_tree::ResourceId::new("/network/peers");
500        if let Err(e) = tree.insert(peer_id, exo_resource_tree::ResourceKind::Device, parent) {
501            tracing::debug!(peer = %peer_name, error = %e, "failed to create tree node for peer");
502        }
503
504        Ok(())
505    }
506
507    /// Get all active peer node IDs.
508    pub fn active_peers(&self) -> Vec<NodeId> {
509        self.peers
510            .iter()
511            .filter(|e| e.state == NodeState::Active)
512            .map(|e| e.key().clone())
513            .collect()
514    }
515}
516
517// ── ClusterService (native coordinator layer) ────────────────────────
518//
519// Wraps ruvector's ClusterManager behind the `cluster` feature flag.
520// Only runs on native coordinator nodes; browser/edge nodes participate
521// through the universal ClusterMembership layer via WebSocket.
522
523#[cfg(feature = "cluster")]
524mod cluster_service {
525    use std::sync::Arc;
526    use std::time::Duration;
527
528    use async_trait::async_trait;
529    use tracing::{debug, info};
530
531    use ruvector_cluster::{
532        ClusterConfig as RuvectorClusterConfig, ClusterManager, ClusterNode, DiscoveryService,
533        NodeStatus, ShardInfo, StaticDiscovery,
534    };
535
536    use crate::cluster::{ClusterMembership, NodePlatform, NodeState, PeerNode};
537    use crate::health::HealthStatus;
538    use crate::service::{ServiceType, SystemService};
539    use clawft_types::config::ClusterNetworkConfig;
540
541    /// Native coordinator cluster service.
542    ///
543    /// Wraps ruvector's [`ClusterManager`] and syncs discovered nodes
544    /// into the kernel's universal [`ClusterMembership`] tracker.
545    pub struct ClusterService {
546        manager: ClusterManager,
547        membership: Arc<ClusterMembership>,
548        config: ClusterNetworkConfig,
549    }
550
551    impl ClusterService {
552        /// Create a new cluster service.
553        ///
554        /// `membership` is the kernel's universal peer tracker that
555        /// all platforms share. The ClusterService syncs ruvector
556        /// native node state into it.
557        pub fn new(
558            config: ClusterNetworkConfig,
559            node_id: String,
560            discovery: Box<dyn DiscoveryService>,
561            membership: Arc<ClusterMembership>,
562        ) -> Result<Self, ruvector_cluster::ClusterError> {
563            let ruvector_config = RuvectorClusterConfig {
564                replication_factor: config.replication_factor,
565                shard_count: config.shard_count,
566                heartbeat_interval: Duration::from_secs(config.heartbeat_interval_secs),
567                node_timeout: Duration::from_secs(config.node_timeout_secs),
568                enable_consensus: config.enable_consensus,
569                min_quorum_size: config.min_quorum_size,
570            };
571
572            let manager = ClusterManager::new(ruvector_config, node_id, discovery)?;
573
574            Ok(Self {
575                manager,
576                membership,
577                config,
578            })
579        }
580
581        /// Create with default config and static (empty) discovery.
582        pub fn with_defaults(
583            node_id: String,
584            membership: Arc<ClusterMembership>,
585        ) -> Result<Self, ruvector_cluster::ClusterError> {
586            let config = ClusterNetworkConfig::default();
587            let discovery = Box::new(StaticDiscovery::new(vec![]));
588            Self::new(config, node_id, discovery, membership)
589        }
590
591        /// Sync ruvector's native node list into the kernel's
592        /// [`ClusterMembership`] tracker.
593        ///
594        /// Converts ruvector [`ClusterNode`] entries into kernel
595        /// [`PeerNode`] entries, mapping `SocketAddr` → `String`
596        /// and `NodeStatus` → `NodeState`.
597        pub fn sync_to_membership(&self) {
598            let nodes = self.manager.list_nodes();
599            for node in &nodes {
600                let peer = Self::cluster_node_to_peer(node);
601                if self.membership.get_peer(&peer.id).is_some() {
602                    // Update existing peer's state
603                    let new_state = Self::map_status(node.status);
604                    let _ = self.membership.update_state(&peer.id, new_state);
605                    let _ = self.membership.heartbeat(&peer.id);
606                } else {
607                    // Add new peer
608                    if let Err(e) = self.membership.add_peer(peer) {
609                        debug!(error = %e, "failed to sync node to membership");
610                    }
611                }
612            }
613        }
614
615        /// Get the cluster network configuration.
616        pub fn config(&self) -> &ClusterNetworkConfig {
617            &self.config
618        }
619
620        /// Get the underlying cluster manager (for advanced operations).
621        pub fn manager(&self) -> &ClusterManager {
622            &self.manager
623        }
624
625        /// Get cluster statistics.
626        pub fn stats(&self) -> ruvector_cluster::ClusterStats {
627            self.manager.get_stats()
628        }
629
630        /// List all shards.
631        pub fn list_shards(&self) -> Vec<ShardInfo> {
632            self.manager.list_shards()
633        }
634
635        /// List all ruvector nodes.
636        pub fn list_nodes(&self) -> Vec<ClusterNode> {
637            self.manager.list_nodes()
638        }
639
640        /// Convert a ruvector `NodeStatus` to a kernel `NodeState`.
641        fn map_status(status: NodeStatus) -> NodeState {
642            match status {
643                NodeStatus::Leader | NodeStatus::Follower | NodeStatus::Candidate => {
644                    NodeState::Active
645                }
646                NodeStatus::Offline => NodeState::Unreachable,
647            }
648        }
649
650        /// Convert a ruvector `ClusterNode` to a kernel `PeerNode`.
651        fn cluster_node_to_peer(node: &ClusterNode) -> PeerNode {
652            PeerNode {
653                id: node.node_id.clone(),
654                name: node
655                    .metadata
656                    .get("name")
657                    .cloned()
658                    .unwrap_or_else(|| node.node_id.clone()),
659                platform: NodePlatform::CloudNative,
660                state: Self::map_status(node.status),
661                address: Some(node.address.to_string()),
662                first_seen: node.last_seen, // best approximation
663                last_heartbeat: node.last_seen,
664                capabilities: Vec::new(),
665                labels: node.metadata.clone(),
666            }
667        }
668    }
669
670    #[async_trait]
671    impl SystemService for ClusterService {
672        fn name(&self) -> &str {
673            "cluster"
674        }
675
676        fn service_type(&self) -> ServiceType {
677            ServiceType::Core
678        }
679
680        async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
681            info!("starting cluster service");
682            self.manager
683                .start()
684                .await
685                .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
686            self.sync_to_membership();
687            info!(
688                nodes = self.manager.list_nodes().len(),
689                shards = self.manager.list_shards().len(),
690                "cluster service started"
691            );
692            Ok(())
693        }
694
695        async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
696            info!("stopping cluster service");
697            // Mark self as leaving in membership
698            let local_id = self.membership.local_node_id().to_owned();
699            let _ = self.membership.update_state(&local_id, NodeState::Left);
700            Ok(())
701        }
702
703        async fn health_check(&self) -> HealthStatus {
704            let stats = self.manager.get_stats();
705            if stats.healthy_nodes > 0 {
706                HealthStatus::Healthy
707            } else {
708                HealthStatus::Degraded("no healthy cluster nodes".into())
709            }
710        }
711    }
712}
713
714#[cfg(feature = "cluster")]
715pub use cluster_service::ClusterService;
716
717#[cfg(test)]
718mod tests {
719    use super::*;
720
721    fn make_peer(id: &str, name: &str) -> PeerNode {
722        PeerNode {
723            id: id.into(),
724            name: name.into(),
725            platform: NodePlatform::CloudNative,
726            state: NodeState::Active,
727            address: Some("10.0.0.1:8080".into()),
728            first_seen: Utc::now(),
729            last_heartbeat: Utc::now(),
730            capabilities: vec!["compute".into()],
731            labels: HashMap::from([("region".into(), "us-east".into())]),
732        }
733    }
734
735    #[test]
736    fn default_config() {
737        let config = ClusterConfig::default();
738        assert_eq!(config.node_name, "local");
739        assert_eq!(config.heartbeat_interval_secs, 5);
740        assert_eq!(config.suspect_threshold, 3);
741    }
742
743    #[test]
744    fn config_serde_roundtrip() {
745        let config = ClusterConfig {
746            node_id: "node-1".into(),
747            node_name: "primary".into(),
748            platform: NodePlatform::Edge,
749            heartbeat_interval_secs: 10,
750            suspect_threshold: 5,
751            unreachable_threshold: 15,
752            max_nodes: 100,
753            ..Default::default()
754        };
755        let json = serde_json::to_string(&config).unwrap();
756        let restored: ClusterConfig = serde_json::from_str(&json).unwrap();
757        assert_eq!(restored.node_name, "primary");
758        assert_eq!(restored.max_nodes, 100);
759    }
760
761    #[test]
762    fn node_platform_display() {
763        assert_eq!(NodePlatform::CloudNative.to_string(), "cloud-native");
764        assert_eq!(NodePlatform::Browser.to_string(), "browser");
765        assert_eq!(
766            NodePlatform::Custom("k8s".into()).to_string(),
767            "custom(k8s)"
768        );
769    }
770
771    #[test]
772    fn node_state_display() {
773        assert_eq!(NodeState::Active.to_string(), "active");
774        assert_eq!(NodeState::Suspect.to_string(), "suspect");
775        assert_eq!(NodeState::Unreachable.to_string(), "unreachable");
776    }
777
778    /// Helper: create a ClusterMembership with rate limiting disabled for tests.
779    fn make_cluster(config: ClusterConfig) -> ClusterMembership {
780        ClusterMembership::new(config)
781            .with_min_peer_interval(std::time::Duration::ZERO)
782    }
783
784    #[test]
785    fn add_and_list_peers() {
786        let cluster = make_cluster(ClusterConfig::default());
787        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
788        cluster.add_peer(make_peer("node-2", "beta")).unwrap();
789
790        let peers = cluster.list_peers();
791        assert_eq!(peers.len(), 2);
792    }
793
794    #[test]
795    fn add_duplicate_fails() {
796        let cluster = make_cluster(ClusterConfig::default());
797        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
798        assert!(matches!(
799            cluster.add_peer(make_peer("node-1", "alpha-dup")),
800            Err(ClusterError::NodeAlreadyExists { .. })
801        ));
802    }
803
804    #[test]
805    fn cluster_full() {
806        let config = ClusterConfig {
807            max_nodes: 1,
808            ..Default::default()
809        };
810        let cluster = make_cluster(config);
811        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
812        assert!(matches!(
813            cluster.add_peer(make_peer("node-2", "beta")),
814            Err(ClusterError::ClusterFull { .. })
815        ));
816    }
817
818    #[test]
819    fn remove_peer() {
820        let cluster = make_cluster(ClusterConfig::default());
821        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
822        let removed = cluster.remove_peer("node-1").unwrap();
823        assert_eq!(removed.name, "alpha");
824        assert!(cluster.is_empty());
825    }
826
827    #[test]
828    fn remove_nonexistent_fails() {
829        let cluster = make_cluster(ClusterConfig::default());
830        assert!(matches!(
831            cluster.remove_peer("nope"),
832            Err(ClusterError::NodeNotFound { .. })
833        ));
834    }
835
836    #[test]
837    fn update_state() {
838        let cluster = make_cluster(ClusterConfig::default());
839        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
840        cluster.update_state("node-1", NodeState::Suspect).unwrap();
841        let peer = cluster.get_peer("node-1").unwrap();
842        assert_eq!(peer.state, NodeState::Suspect);
843    }
844
845    #[test]
846    fn heartbeat_clears_suspect() {
847        let cluster = make_cluster(ClusterConfig::default());
848        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
849        cluster.update_state("node-1", NodeState::Suspect).unwrap();
850        cluster.heartbeat("node-1").unwrap();
851        let peer = cluster.get_peer("node-1").unwrap();
852        assert_eq!(peer.state, NodeState::Active);
853    }
854
855    #[test]
856    fn count_by_state() {
857        let cluster = make_cluster(ClusterConfig::default());
858        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
859        cluster.add_peer(make_peer("node-2", "beta")).unwrap();
860        cluster.update_state("node-2", NodeState::Suspect).unwrap();
861        assert_eq!(cluster.count_by_state(&NodeState::Active), 1);
862        assert_eq!(cluster.count_by_state(&NodeState::Suspect), 1);
863    }
864
865    #[test]
866    fn active_peers() {
867        let cluster = make_cluster(ClusterConfig::default());
868        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
869        cluster.add_peer(make_peer("node-2", "beta")).unwrap();
870        cluster.update_state("node-2", NodeState::Leaving).unwrap();
871        let active = cluster.active_peers();
872        assert_eq!(active.len(), 1);
873        assert_eq!(active[0], "node-1");
874    }
875
876    #[test]
877    fn peer_serde_roundtrip() {
878        let peer = make_peer("node-1", "alpha");
879        let json = serde_json::to_string(&peer).unwrap();
880        let restored: PeerNode = serde_json::from_str(&json).unwrap();
881        assert_eq!(restored.id, "node-1");
882        assert_eq!(restored.capabilities, vec!["compute"]);
883    }
884
885    #[test]
886    fn cluster_error_display() {
887        let err = ClusterError::NodeNotFound {
888            node_id: "node-1".into(),
889        };
890        assert!(err.to_string().contains("node-1"));
891
892        let err = ClusterError::ClusterFull { max: 10 };
893        assert!(err.to_string().contains("10"));
894    }
895
896    #[test]
897    fn mesh_and_auth_error_display() {
898        let err = ClusterError::Mesh("connection refused".into());
899        assert!(err.to_string().contains("connection refused"));
900
901        let err = ClusterError::AuthFailed("bad signature".into());
902        assert!(err.to_string().contains("bad signature"));
903    }
904
905    #[test]
906    fn default_config_new_fields() {
907        let config = ClusterConfig::default();
908        assert!(config.bind_address.is_none());
909        assert!(config.seed_peers.is_empty());
910        assert!(config.identity_key_path.is_none());
911    }
912
913    #[test]
914    fn rate_limited_peer_additions() {
915        // Use the default 100ms rate limit (do NOT use make_cluster here).
916        let cluster = ClusterMembership::new(ClusterConfig::default());
917        cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
918
919        // Second add immediately should be rate limited.
920        let result = cluster.add_peer(make_peer("node-2", "beta"));
921        assert!(
922            matches!(result, Err(ClusterError::RateLimited)),
923            "expected RateLimited, got {result:?}"
924        );
925    }
926
927    #[test]
928    fn rate_limited_error_display() {
929        let err = ClusterError::RateLimited;
930        assert!(err.to_string().contains("rate limited"));
931    }
932
933    #[test]
934    fn validate_known_capabilities() {
935        let known = vec!["ipc".into(), "mesh".into(), "chain".into()];
936        let unknown = validate_capabilities(&known);
937        assert!(unknown.is_empty());
938    }
939
940    #[test]
941    fn validate_unknown_capabilities() {
942        let caps = vec!["ipc".into(), "teleport".into(), "quantum".into()];
943        let unknown = validate_capabilities(&caps);
944        assert_eq!(unknown, vec!["teleport", "quantum"]);
945    }
946
947    #[test]
948    fn add_peer_with_unknown_capabilities_succeeds() {
949        let cluster = make_cluster(ClusterConfig::default());
950        let mut peer = make_peer("node-1", "alpha");
951        peer.capabilities = vec!["ipc".into(), "teleport".into()];
952        // Should succeed (warning logged, but not rejected).
953        cluster.add_peer(peer).unwrap();
954        assert_eq!(cluster.len(), 1);
955    }
956}
957
958#[cfg(test)]
959#[cfg(any(feature = "mesh", feature = "exochain"))]
960mod mesh_tests {
961    use super::*;
962
963    #[test]
964    fn node_identity_unique_ids() {
965        let id1 = NodeIdentity::generate();
966        let id2 = NodeIdentity::generate();
967        assert_ne!(id1.node_id(), id2.node_id());
968    }
969
970    #[test]
971    fn node_identity_sign_verify() {
972        use ed25519_dalek::Verifier;
973
974        let identity = NodeIdentity::generate();
975        let data = b"hello mesh";
976        let sig = identity.sign(data);
977        assert!(identity.public_key().verify(data, &sig).is_ok());
978    }
979
980    #[test]
981    fn node_identity_id_is_32_hex_chars() {
982        let identity = NodeIdentity::generate();
983        let nid = identity.node_id();
984        assert_eq!(nid.len(), 32, "node_id should be 32 hex chars (16 bytes)");
985        assert!(
986            nid.chars().all(|c| c.is_ascii_hexdigit()),
987            "node_id must be hex: {nid}"
988        );
989    }
990}