1use std::collections::HashMap;
14use std::time::Instant;
15
16use chrono::{DateTime, Utc};
17use dashmap::DashMap;
18use serde::{Deserialize, Serialize};
19use tracing::{debug, warn};
20
21pub type NodeId = String;
23
24#[non_exhaustive]
26#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
27pub enum NodePlatform {
28 CloudNative,
30 Edge,
32 Browser,
34 Wasi,
36 Custom(String),
38}
39
40impl std::fmt::Display for NodePlatform {
41 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
42 match self {
43 NodePlatform::CloudNative => write!(f, "cloud-native"),
44 NodePlatform::Edge => write!(f, "edge"),
45 NodePlatform::Browser => write!(f, "browser"),
46 NodePlatform::Wasi => write!(f, "wasi"),
47 NodePlatform::Custom(name) => write!(f, "custom({name})"),
48 }
49 }
50}
51
52#[non_exhaustive]
54#[derive(Debug, Clone, PartialEq, Eq, Serialize, Deserialize)]
55pub enum NodeState {
56 Joining,
58 Active,
60 Suspect,
62 Unreachable,
64 Leaving,
66 Left,
68}
69
70impl std::fmt::Display for NodeState {
71 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
72 match self {
73 NodeState::Joining => write!(f, "joining"),
74 NodeState::Active => write!(f, "active"),
75 NodeState::Suspect => write!(f, "suspect"),
76 NodeState::Unreachable => write!(f, "unreachable"),
77 NodeState::Leaving => write!(f, "leaving"),
78 NodeState::Left => write!(f, "left"),
79 }
80 }
81}
82
83#[derive(Debug, Clone, Serialize, Deserialize)]
85pub struct PeerNode {
86 pub id: NodeId,
88
89 pub name: String,
91
92 pub platform: NodePlatform,
94
95 pub state: NodeState,
97
98 #[serde(default, skip_serializing_if = "Option::is_none")]
100 pub address: Option<String>,
101
102 pub first_seen: DateTime<Utc>,
104
105 pub last_heartbeat: DateTime<Utc>,
107
108 #[serde(default)]
110 pub capabilities: Vec<String>,
111
112 #[serde(default)]
114 pub labels: HashMap<String, String>,
115}
116
117#[derive(Debug, Clone, Serialize, Deserialize)]
119pub struct ClusterConfig {
120 pub node_id: NodeId,
122
123 #[serde(default = "default_node_name")]
125 pub node_name: String,
126
127 #[serde(default = "default_platform")]
129 pub platform: NodePlatform,
130
131 #[serde(default = "default_heartbeat_interval")]
133 pub heartbeat_interval_secs: u64,
134
135 #[serde(default = "default_suspect_threshold")]
137 pub suspect_threshold: u32,
138
139 #[serde(default = "default_unreachable_threshold")]
141 pub unreachable_threshold: u32,
142
143 #[serde(default)]
145 pub max_nodes: u32,
146
147 #[serde(default)]
149 pub bind_address: Option<String>,
150
151 #[serde(default)]
153 pub seed_peers: Vec<String>,
154
155 #[serde(default)]
157 pub identity_key_path: Option<std::path::PathBuf>,
158}
159
160fn default_node_name() -> String {
161 "local".into()
162}
163
164fn default_platform() -> NodePlatform {
165 NodePlatform::CloudNative
166}
167
168fn default_heartbeat_interval() -> u64 {
169 5
170}
171
172fn default_suspect_threshold() -> u32 {
173 3
174}
175
176fn default_unreachable_threshold() -> u32 {
177 10
178}
179
180impl Default for ClusterConfig {
181 fn default() -> Self {
182 Self {
183 node_id: uuid::Uuid::new_v4().to_string(),
184 node_name: default_node_name(),
185 platform: default_platform(),
186 heartbeat_interval_secs: default_heartbeat_interval(),
187 suspect_threshold: default_suspect_threshold(),
188 unreachable_threshold: default_unreachable_threshold(),
189 max_nodes: 0,
190 bind_address: None,
191 seed_peers: Vec::new(),
192 identity_key_path: None,
193 }
194 }
195}
196
197#[cfg(feature = "ecc")]
204#[derive(Debug, Clone, Serialize, Deserialize)]
205pub struct NodeEccCapability {
206 pub tick_interval_ms: u32,
208 pub compute_p95_us: u32,
210 pub headroom_ratio: f32,
212 pub hnsw_vector_count: u32,
214 pub causal_edge_count: u32,
216 pub spectral_capable: bool,
218 pub calibrated_at: u64,
220}
221
222#[cfg(any(feature = "mesh", feature = "exochain"))]
229pub struct NodeIdentity {
230 keypair: ed25519_dalek::SigningKey,
232 node_id: String,
234}
235
236#[cfg(any(feature = "mesh", feature = "exochain"))]
237impl NodeIdentity {
238 pub fn generate() -> Self {
240 use sha2::Digest;
241
242 let mut csprng = rand::thread_rng();
243 let keypair = ed25519_dalek::SigningKey::generate(&mut csprng);
244 let pubkey_bytes = keypair.verifying_key().to_bytes();
245
246 let hash = sha2::Sha256::digest(pubkey_bytes);
247 let node_id = hash[..16]
248 .iter()
249 .fold(String::with_capacity(32), |mut s, b| {
250 use std::fmt::Write;
251 let _ = write!(s, "{b:02x}");
252 s
253 });
254
255 Self { keypair, node_id }
256 }
257
258 pub fn node_id(&self) -> &str {
260 &self.node_id
261 }
262
263 pub fn public_key(&self) -> ed25519_dalek::VerifyingKey {
265 self.keypair.verifying_key()
266 }
267
268 pub fn sign(&self, data: &[u8]) -> ed25519_dalek::Signature {
270 use ed25519_dalek::Signer;
271 self.keypair.sign(data)
272 }
273}
274
275#[non_exhaustive]
277#[derive(Debug, thiserror::Error)]
278pub enum ClusterError {
279 #[error("node already exists: '{node_id}'")]
281 NodeAlreadyExists {
282 node_id: NodeId,
284 },
285
286 #[error("node not found: '{node_id}'")]
288 NodeNotFound {
289 node_id: NodeId,
291 },
292
293 #[error("cluster full: max {max} nodes")]
295 ClusterFull {
296 max: u32,
298 },
299
300 #[error("mesh error: {0}")]
302 Mesh(String),
303
304 #[error("authentication failed: {0}")]
306 AuthFailed(String),
307
308 #[error("invalid node state transition: {from} -> {to}")]
310 InvalidTransition {
311 from: String,
313 to: String,
315 },
316
317 #[error("rate limited: peer additions too frequent")]
319 RateLimited,
320}
321
322const KNOWN_CAPABILITIES: &[&str] = &[
329 "ipc", "chain", "tree", "governance", "ecc", "wasm", "containers", "apps",
330 "mesh", "discovery", "heartbeat", "compute",
331];
332
333fn validate_capabilities(capabilities: &[String]) -> Vec<String> {
336 capabilities
337 .iter()
338 .filter(|c| !KNOWN_CAPABILITIES.contains(&c.as_str()))
339 .cloned()
340 .collect()
341}
342
343pub struct ClusterMembership {
344 config: ClusterConfig,
345 peers: DashMap<NodeId, PeerNode>,
346 last_peer_add: std::sync::Mutex<Option<Instant>>,
348 min_peer_add_interval: std::time::Duration,
350}
351
352impl ClusterMembership {
353 pub fn new(config: ClusterConfig) -> Self {
355 Self {
356 config,
357 peers: DashMap::new(),
358 last_peer_add: std::sync::Mutex::new(None),
359 min_peer_add_interval: std::time::Duration::from_millis(100),
360 }
361 }
362
363 pub fn with_min_peer_interval(mut self, interval: std::time::Duration) -> Self {
365 self.min_peer_add_interval = interval;
366 self
367 }
368
369 pub fn config(&self) -> &ClusterConfig {
371 &self.config
372 }
373
374 pub fn local_node_id(&self) -> &str {
376 &self.config.node_id
377 }
378
379 pub fn add_peer(&self, peer: PeerNode) -> Result<(), ClusterError> {
384 {
386 let mut last = self.last_peer_add.lock().unwrap();
387 if let Some(ts) = *last {
388 if ts.elapsed() < self.min_peer_add_interval {
389 return Err(ClusterError::RateLimited);
390 }
391 }
392 *last = Some(Instant::now());
393 }
394
395 if self.peers.contains_key(&peer.id) {
396 return Err(ClusterError::NodeAlreadyExists { node_id: peer.id });
397 }
398
399 if self.config.max_nodes > 0 && self.peers.len() as u32 >= self.config.max_nodes {
400 return Err(ClusterError::ClusterFull {
401 max: self.config.max_nodes,
402 });
403 }
404
405 let unknown = validate_capabilities(&peer.capabilities);
407 if !unknown.is_empty() {
408 warn!(
409 node_id = %peer.id,
410 unknown_capabilities = ?unknown,
411 "peer advertises unknown capabilities"
412 );
413 }
414
415 debug!(node_id = %peer.id, name = %peer.name, "adding peer to cluster");
416 self.peers.insert(peer.id.clone(), peer);
417 Ok(())
418 }
419
420 pub fn remove_peer(&self, node_id: &str) -> Result<PeerNode, ClusterError> {
422 self.peers
423 .remove(node_id)
424 .map(|(_, peer)| peer)
425 .ok_or_else(|| ClusterError::NodeNotFound {
426 node_id: node_id.to_owned(),
427 })
428 }
429
430 pub fn update_state(&self, node_id: &str, new_state: NodeState) -> Result<(), ClusterError> {
432 let mut entry = self
433 .peers
434 .get_mut(node_id)
435 .ok_or_else(|| ClusterError::NodeNotFound {
436 node_id: node_id.to_owned(),
437 })?;
438 entry.state = new_state;
439 Ok(())
440 }
441
442 pub fn heartbeat(&self, node_id: &str) -> Result<(), ClusterError> {
444 let mut entry = self
445 .peers
446 .get_mut(node_id)
447 .ok_or_else(|| ClusterError::NodeNotFound {
448 node_id: node_id.to_owned(),
449 })?;
450 entry.last_heartbeat = Utc::now();
451 if entry.state == NodeState::Suspect {
452 entry.state = NodeState::Active;
453 }
454 Ok(())
455 }
456
457 pub fn get_peer(&self, node_id: &str) -> Option<PeerNode> {
459 self.peers.get(node_id).map(|e| e.value().clone())
460 }
461
462 pub fn list_peers(&self) -> Vec<(NodeId, NodeState, NodePlatform)> {
464 self.peers
465 .iter()
466 .map(|e| (e.key().clone(), e.state.clone(), e.platform.clone()))
467 .collect()
468 }
469
470 pub fn count_by_state(&self, state: &NodeState) -> usize {
472 self.peers.iter().filter(|e| &e.state == state).count()
473 }
474
475 pub fn len(&self) -> usize {
477 self.peers.len()
478 }
479
480 pub fn is_empty(&self) -> bool {
482 self.peers.is_empty()
483 }
484
485 #[cfg(feature = "exochain")]
487 pub fn add_peer_with_tree(
488 &self,
489 peer: PeerNode,
490 tree: &std::sync::Mutex<exo_resource_tree::ResourceTree>,
491 ) -> Result<(), ClusterError> {
492 let peer_name = peer.name.clone();
493 self.add_peer(peer)?;
494
495 let mut tree = tree.lock().unwrap();
497 let peer_id =
498 exo_resource_tree::ResourceId::new(format!("/network/peers/{peer_name}"));
499 let parent = exo_resource_tree::ResourceId::new("/network/peers");
500 if let Err(e) = tree.insert(peer_id, exo_resource_tree::ResourceKind::Device, parent) {
501 tracing::debug!(peer = %peer_name, error = %e, "failed to create tree node for peer");
502 }
503
504 Ok(())
505 }
506
507 pub fn active_peers(&self) -> Vec<NodeId> {
509 self.peers
510 .iter()
511 .filter(|e| e.state == NodeState::Active)
512 .map(|e| e.key().clone())
513 .collect()
514 }
515}
516
517#[cfg(feature = "cluster")]
524mod cluster_service {
525 use std::sync::Arc;
526 use std::time::Duration;
527
528 use async_trait::async_trait;
529 use tracing::{debug, info};
530
531 use ruvector_cluster::{
532 ClusterConfig as RuvectorClusterConfig, ClusterManager, ClusterNode, DiscoveryService,
533 NodeStatus, ShardInfo, StaticDiscovery,
534 };
535
536 use crate::cluster::{ClusterMembership, NodePlatform, NodeState, PeerNode};
537 use crate::health::HealthStatus;
538 use crate::service::{ServiceType, SystemService};
539 use clawft_types::config::ClusterNetworkConfig;
540
541 pub struct ClusterService {
546 manager: ClusterManager,
547 membership: Arc<ClusterMembership>,
548 config: ClusterNetworkConfig,
549 }
550
551 impl ClusterService {
552 pub fn new(
558 config: ClusterNetworkConfig,
559 node_id: String,
560 discovery: Box<dyn DiscoveryService>,
561 membership: Arc<ClusterMembership>,
562 ) -> Result<Self, ruvector_cluster::ClusterError> {
563 let ruvector_config = RuvectorClusterConfig {
564 replication_factor: config.replication_factor,
565 shard_count: config.shard_count,
566 heartbeat_interval: Duration::from_secs(config.heartbeat_interval_secs),
567 node_timeout: Duration::from_secs(config.node_timeout_secs),
568 enable_consensus: config.enable_consensus,
569 min_quorum_size: config.min_quorum_size,
570 };
571
572 let manager = ClusterManager::new(ruvector_config, node_id, discovery)?;
573
574 Ok(Self {
575 manager,
576 membership,
577 config,
578 })
579 }
580
581 pub fn with_defaults(
583 node_id: String,
584 membership: Arc<ClusterMembership>,
585 ) -> Result<Self, ruvector_cluster::ClusterError> {
586 let config = ClusterNetworkConfig::default();
587 let discovery = Box::new(StaticDiscovery::new(vec![]));
588 Self::new(config, node_id, discovery, membership)
589 }
590
591 pub fn sync_to_membership(&self) {
598 let nodes = self.manager.list_nodes();
599 for node in &nodes {
600 let peer = Self::cluster_node_to_peer(node);
601 if self.membership.get_peer(&peer.id).is_some() {
602 let new_state = Self::map_status(node.status);
604 let _ = self.membership.update_state(&peer.id, new_state);
605 let _ = self.membership.heartbeat(&peer.id);
606 } else {
607 if let Err(e) = self.membership.add_peer(peer) {
609 debug!(error = %e, "failed to sync node to membership");
610 }
611 }
612 }
613 }
614
615 pub fn config(&self) -> &ClusterNetworkConfig {
617 &self.config
618 }
619
620 pub fn manager(&self) -> &ClusterManager {
622 &self.manager
623 }
624
625 pub fn stats(&self) -> ruvector_cluster::ClusterStats {
627 self.manager.get_stats()
628 }
629
630 pub fn list_shards(&self) -> Vec<ShardInfo> {
632 self.manager.list_shards()
633 }
634
635 pub fn list_nodes(&self) -> Vec<ClusterNode> {
637 self.manager.list_nodes()
638 }
639
640 fn map_status(status: NodeStatus) -> NodeState {
642 match status {
643 NodeStatus::Leader | NodeStatus::Follower | NodeStatus::Candidate => {
644 NodeState::Active
645 }
646 NodeStatus::Offline => NodeState::Unreachable,
647 }
648 }
649
650 fn cluster_node_to_peer(node: &ClusterNode) -> PeerNode {
652 PeerNode {
653 id: node.node_id.clone(),
654 name: node
655 .metadata
656 .get("name")
657 .cloned()
658 .unwrap_or_else(|| node.node_id.clone()),
659 platform: NodePlatform::CloudNative,
660 state: Self::map_status(node.status),
661 address: Some(node.address.to_string()),
662 first_seen: node.last_seen, last_heartbeat: node.last_seen,
664 capabilities: Vec::new(),
665 labels: node.metadata.clone(),
666 }
667 }
668 }
669
670 #[async_trait]
671 impl SystemService for ClusterService {
672 fn name(&self) -> &str {
673 "cluster"
674 }
675
676 fn service_type(&self) -> ServiceType {
677 ServiceType::Core
678 }
679
680 async fn start(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
681 info!("starting cluster service");
682 self.manager
683 .start()
684 .await
685 .map_err(|e| -> Box<dyn std::error::Error + Send + Sync> { Box::new(e) })?;
686 self.sync_to_membership();
687 info!(
688 nodes = self.manager.list_nodes().len(),
689 shards = self.manager.list_shards().len(),
690 "cluster service started"
691 );
692 Ok(())
693 }
694
695 async fn stop(&self) -> Result<(), Box<dyn std::error::Error + Send + Sync>> {
696 info!("stopping cluster service");
697 let local_id = self.membership.local_node_id().to_owned();
699 let _ = self.membership.update_state(&local_id, NodeState::Left);
700 Ok(())
701 }
702
703 async fn health_check(&self) -> HealthStatus {
704 let stats = self.manager.get_stats();
705 if stats.healthy_nodes > 0 {
706 HealthStatus::Healthy
707 } else {
708 HealthStatus::Degraded("no healthy cluster nodes".into())
709 }
710 }
711 }
712}
713
714#[cfg(feature = "cluster")]
715pub use cluster_service::ClusterService;
716
717#[cfg(test)]
718mod tests {
719 use super::*;
720
721 fn make_peer(id: &str, name: &str) -> PeerNode {
722 PeerNode {
723 id: id.into(),
724 name: name.into(),
725 platform: NodePlatform::CloudNative,
726 state: NodeState::Active,
727 address: Some("10.0.0.1:8080".into()),
728 first_seen: Utc::now(),
729 last_heartbeat: Utc::now(),
730 capabilities: vec!["compute".into()],
731 labels: HashMap::from([("region".into(), "us-east".into())]),
732 }
733 }
734
735 #[test]
736 fn default_config() {
737 let config = ClusterConfig::default();
738 assert_eq!(config.node_name, "local");
739 assert_eq!(config.heartbeat_interval_secs, 5);
740 assert_eq!(config.suspect_threshold, 3);
741 }
742
743 #[test]
744 fn config_serde_roundtrip() {
745 let config = ClusterConfig {
746 node_id: "node-1".into(),
747 node_name: "primary".into(),
748 platform: NodePlatform::Edge,
749 heartbeat_interval_secs: 10,
750 suspect_threshold: 5,
751 unreachable_threshold: 15,
752 max_nodes: 100,
753 ..Default::default()
754 };
755 let json = serde_json::to_string(&config).unwrap();
756 let restored: ClusterConfig = serde_json::from_str(&json).unwrap();
757 assert_eq!(restored.node_name, "primary");
758 assert_eq!(restored.max_nodes, 100);
759 }
760
761 #[test]
762 fn node_platform_display() {
763 assert_eq!(NodePlatform::CloudNative.to_string(), "cloud-native");
764 assert_eq!(NodePlatform::Browser.to_string(), "browser");
765 assert_eq!(
766 NodePlatform::Custom("k8s".into()).to_string(),
767 "custom(k8s)"
768 );
769 }
770
771 #[test]
772 fn node_state_display() {
773 assert_eq!(NodeState::Active.to_string(), "active");
774 assert_eq!(NodeState::Suspect.to_string(), "suspect");
775 assert_eq!(NodeState::Unreachable.to_string(), "unreachable");
776 }
777
778 fn make_cluster(config: ClusterConfig) -> ClusterMembership {
780 ClusterMembership::new(config)
781 .with_min_peer_interval(std::time::Duration::ZERO)
782 }
783
784 #[test]
785 fn add_and_list_peers() {
786 let cluster = make_cluster(ClusterConfig::default());
787 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
788 cluster.add_peer(make_peer("node-2", "beta")).unwrap();
789
790 let peers = cluster.list_peers();
791 assert_eq!(peers.len(), 2);
792 }
793
794 #[test]
795 fn add_duplicate_fails() {
796 let cluster = make_cluster(ClusterConfig::default());
797 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
798 assert!(matches!(
799 cluster.add_peer(make_peer("node-1", "alpha-dup")),
800 Err(ClusterError::NodeAlreadyExists { .. })
801 ));
802 }
803
804 #[test]
805 fn cluster_full() {
806 let config = ClusterConfig {
807 max_nodes: 1,
808 ..Default::default()
809 };
810 let cluster = make_cluster(config);
811 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
812 assert!(matches!(
813 cluster.add_peer(make_peer("node-2", "beta")),
814 Err(ClusterError::ClusterFull { .. })
815 ));
816 }
817
818 #[test]
819 fn remove_peer() {
820 let cluster = make_cluster(ClusterConfig::default());
821 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
822 let removed = cluster.remove_peer("node-1").unwrap();
823 assert_eq!(removed.name, "alpha");
824 assert!(cluster.is_empty());
825 }
826
827 #[test]
828 fn remove_nonexistent_fails() {
829 let cluster = make_cluster(ClusterConfig::default());
830 assert!(matches!(
831 cluster.remove_peer("nope"),
832 Err(ClusterError::NodeNotFound { .. })
833 ));
834 }
835
836 #[test]
837 fn update_state() {
838 let cluster = make_cluster(ClusterConfig::default());
839 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
840 cluster.update_state("node-1", NodeState::Suspect).unwrap();
841 let peer = cluster.get_peer("node-1").unwrap();
842 assert_eq!(peer.state, NodeState::Suspect);
843 }
844
845 #[test]
846 fn heartbeat_clears_suspect() {
847 let cluster = make_cluster(ClusterConfig::default());
848 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
849 cluster.update_state("node-1", NodeState::Suspect).unwrap();
850 cluster.heartbeat("node-1").unwrap();
851 let peer = cluster.get_peer("node-1").unwrap();
852 assert_eq!(peer.state, NodeState::Active);
853 }
854
855 #[test]
856 fn count_by_state() {
857 let cluster = make_cluster(ClusterConfig::default());
858 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
859 cluster.add_peer(make_peer("node-2", "beta")).unwrap();
860 cluster.update_state("node-2", NodeState::Suspect).unwrap();
861 assert_eq!(cluster.count_by_state(&NodeState::Active), 1);
862 assert_eq!(cluster.count_by_state(&NodeState::Suspect), 1);
863 }
864
865 #[test]
866 fn active_peers() {
867 let cluster = make_cluster(ClusterConfig::default());
868 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
869 cluster.add_peer(make_peer("node-2", "beta")).unwrap();
870 cluster.update_state("node-2", NodeState::Leaving).unwrap();
871 let active = cluster.active_peers();
872 assert_eq!(active.len(), 1);
873 assert_eq!(active[0], "node-1");
874 }
875
876 #[test]
877 fn peer_serde_roundtrip() {
878 let peer = make_peer("node-1", "alpha");
879 let json = serde_json::to_string(&peer).unwrap();
880 let restored: PeerNode = serde_json::from_str(&json).unwrap();
881 assert_eq!(restored.id, "node-1");
882 assert_eq!(restored.capabilities, vec!["compute"]);
883 }
884
885 #[test]
886 fn cluster_error_display() {
887 let err = ClusterError::NodeNotFound {
888 node_id: "node-1".into(),
889 };
890 assert!(err.to_string().contains("node-1"));
891
892 let err = ClusterError::ClusterFull { max: 10 };
893 assert!(err.to_string().contains("10"));
894 }
895
896 #[test]
897 fn mesh_and_auth_error_display() {
898 let err = ClusterError::Mesh("connection refused".into());
899 assert!(err.to_string().contains("connection refused"));
900
901 let err = ClusterError::AuthFailed("bad signature".into());
902 assert!(err.to_string().contains("bad signature"));
903 }
904
905 #[test]
906 fn default_config_new_fields() {
907 let config = ClusterConfig::default();
908 assert!(config.bind_address.is_none());
909 assert!(config.seed_peers.is_empty());
910 assert!(config.identity_key_path.is_none());
911 }
912
913 #[test]
914 fn rate_limited_peer_additions() {
915 let cluster = ClusterMembership::new(ClusterConfig::default());
917 cluster.add_peer(make_peer("node-1", "alpha")).unwrap();
918
919 let result = cluster.add_peer(make_peer("node-2", "beta"));
921 assert!(
922 matches!(result, Err(ClusterError::RateLimited)),
923 "expected RateLimited, got {result:?}"
924 );
925 }
926
927 #[test]
928 fn rate_limited_error_display() {
929 let err = ClusterError::RateLimited;
930 assert!(err.to_string().contains("rate limited"));
931 }
932
933 #[test]
934 fn validate_known_capabilities() {
935 let known = vec!["ipc".into(), "mesh".into(), "chain".into()];
936 let unknown = validate_capabilities(&known);
937 assert!(unknown.is_empty());
938 }
939
940 #[test]
941 fn validate_unknown_capabilities() {
942 let caps = vec!["ipc".into(), "teleport".into(), "quantum".into()];
943 let unknown = validate_capabilities(&caps);
944 assert_eq!(unknown, vec!["teleport", "quantum"]);
945 }
946
947 #[test]
948 fn add_peer_with_unknown_capabilities_succeeds() {
949 let cluster = make_cluster(ClusterConfig::default());
950 let mut peer = make_peer("node-1", "alpha");
951 peer.capabilities = vec!["ipc".into(), "teleport".into()];
952 cluster.add_peer(peer).unwrap();
954 assert_eq!(cluster.len(), 1);
955 }
956}
957
958#[cfg(test)]
959#[cfg(any(feature = "mesh", feature = "exochain"))]
960mod mesh_tests {
961 use super::*;
962
963 #[test]
964 fn node_identity_unique_ids() {
965 let id1 = NodeIdentity::generate();
966 let id2 = NodeIdentity::generate();
967 assert_ne!(id1.node_id(), id2.node_id());
968 }
969
970 #[test]
971 fn node_identity_sign_verify() {
972 use ed25519_dalek::Verifier;
973
974 let identity = NodeIdentity::generate();
975 let data = b"hello mesh";
976 let sig = identity.sign(data);
977 assert!(identity.public_key().verify(data, &sig).is_ok());
978 }
979
980 #[test]
981 fn node_identity_id_is_32_hex_chars() {
982 let identity = NodeIdentity::generate();
983 let nid = identity.node_id();
984 assert_eq!(nid.len(), 32, "node_id should be 32 hex chars (16 bytes)");
985 assert!(
986 nid.chars().all(|c| c.is_ascii_hexdigit()),
987 "node_id must be hex: {nid}"
988 );
989 }
990}