Skip to main content

oxirs_cluster/
lib.rs

1//! # OxiRS Cluster
2//!
3//! [![Version](https://img.shields.io/badge/version-0.2.4-blue)](https://github.com/cool-japan/oxirs/releases)
4//! [![docs.rs](https://docs.rs/oxirs-cluster/badge.svg)](https://docs.rs/oxirs-cluster)
5//!
6//! **Status**: Production Release (v0.2.4)
7//! **Stability**: Public APIs are stable. Production-ready with comprehensive testing.
8//!
9//! Raft-backed distributed dataset for high availability and horizontal scaling.
10//!
11//! This crate provides distributed storage capabilities using Raft consensus with
12//! multi-region support, Byzantine fault tolerance, and advanced replication strategies.
13//!
14//! ## Features
15//!
16//! - **Raft Consensus**: Production-ready Raft implementation using openraft
17//! - **Distributed RDF Storage**: Scalable, consistent RDF triple storage
18//! - **Automatic Failover**: Leader election and automatic recovery
19//! - **Node Discovery**: Multiple discovery mechanisms (static, DNS, multicast)
20//! - **Replication Management**: Configurable replication strategies
21//! - **SPARQL Support**: Distributed SPARQL query execution
22//! - **Transaction Support**: Distributed ACID transactions
23//!
24//! ## Example
25//!
26//! ```ignore
27//! use oxirs_cluster::{ClusterNode, NodeConfig};
28//! use std::net::SocketAddr;
29//!
30//! async fn example() -> Result<(), Box<dyn std::error::Error>> {
31//!     let config = NodeConfig {
32//!         node_id: 1,
33//!         address: "127.0.0.1:8080".parse()?,
34//!         data_dir: "./data".to_string(),
35//!         peers: vec![2, 3],
36//!     };
37//!
38//!     let mut node = ClusterNode::new(config).await?;
39//!     node.start().await?;
40//!
41//!     // Insert data through consensus
42//!     node.insert_triple(
43//!         "<http://example.org/subject>",
44//!         "<http://example.org/predicate>",
45//!         "\"object\"")
46//!     .await?;
47//!
48//!     Ok(())
49//! }
50//! ```
51
52#![allow(clippy::field_reassign_with_default)]
53#![allow(clippy::single_match)]
54#![allow(clippy::collapsible_if)]
55#![allow(clippy::clone_on_copy)]
56#![allow(clippy::type_complexity)]
57#![allow(clippy::collapsible_match)]
58#![allow(clippy::manual_clamp)]
59#![allow(clippy::needless_range_loop)]
60#![allow(clippy::or_fun_call)]
61#![allow(clippy::if_same_then_else)]
62#![allow(clippy::only_used_in_recursion)]
63#![allow(clippy::new_without_default)]
64#![allow(clippy::derivable_impls)]
65#![allow(clippy::useless_conversion)]
66use serde::{Deserialize, Serialize};
67use std::net::SocketAddr;
68use std::sync::Arc;
69use tokio::sync::RwLock;
70
71pub mod adaptive_leader_election;
72pub mod advanced_partitioning;
73pub mod advanced_storage;
74pub mod alerting;
75pub mod auto_scaling;
76pub mod backup_restore;
77pub mod circuit_breaker;
78pub mod cloud_integration;
79pub mod cluster_metrics;
80pub mod compression_strategy;
81pub mod conflict_resolution;
82pub mod consensus;
83pub mod crash_recovery;
84pub mod data_rebalancing;
85pub mod disaster_recovery;
86pub mod discovery;
87pub mod distributed_query;
88pub mod distributed_tracing;
89pub mod edge_computing;
90pub mod encryption;
91pub mod enhanced_node_discovery;
92pub mod enhanced_snapshotting;
93pub mod error;
94pub mod failover;
95pub mod federation;
96pub mod gpu_acceleration;
97pub mod health_monitor;
98pub mod health_monitoring;
99pub mod memory_optimization;
100pub mod merkle_tree;
101pub mod ml_optimization;
102pub mod multi_tenant;
103pub mod mvcc;
104pub mod mvcc_storage;
105pub mod network;
106pub mod neural_architecture_search;
107pub mod node_lifecycle;
108pub mod node_status_tracker;
109pub mod operational_transformation;
110pub mod optimization;
111pub mod partition_detection;
112pub mod performance_metrics;
113pub mod performance_monitor;
114pub mod raft;
115pub mod raft_optimization;
116pub mod raft_profiling;
117pub mod raft_state;
118pub mod range_partitioning;
119pub mod read_replica;
120pub mod region_manager;
121pub mod replication;
122pub mod replication_lag_monitor;
123pub mod rl_consensus_optimizer;
124pub mod rolling_upgrade;
125pub mod rolling_upgrade_orchestrator;
126pub mod split_brain_detector;
127pub mod visualization_dashboard;
128pub mod zero_downtime_migration;
129// Temporarily disabled due to missing scirs2_core features
130// pub mod revolutionary_cluster_optimization;
131pub mod cross_dc;
132pub mod network_compression;
133pub mod security;
134pub mod serialization;
135pub mod shard;
136pub mod shard_manager;
137pub mod shard_migration;
138pub mod shard_routing;
139pub mod split_brain_prevention;
140pub mod storage;
141pub mod strong_consistency;
142pub mod tls;
143pub mod topology;
144pub mod transaction;
145pub mod transaction_optimizer;
146
147#[cfg(feature = "bft")]
148pub mod bft;
149#[cfg(feature = "bft")]
150pub mod bft_consensus;
151#[cfg(feature = "bft")]
152pub mod bft_network;
153
154pub mod gossip_scaling;
155pub mod sla_manager;
156pub mod stream_integration;
157
158// New modules added in v0.2.0
159pub mod adaptive_consistent_hash;
160pub mod cross_dc_consistency;
161pub mod distributed_tx_coordinator;
162
163// v1.1.0 Consistent hashing with virtual nodes and bounded loads
164pub mod vnodes_hash_ring;
165
166// v1.2.0 Gossip protocol for cluster membership management
167pub mod membership_gossip;
168
169// v1.2.0 Bully algorithm leader election simulation
170pub mod leader_election;
171
172// v1.2.0 Raft snapshot management with retention and checksum validation
173pub mod snapshot_manager;
174
175// v1.5.0 Consistent-hash shard router
176pub mod consistent_shard_router;
177
178// v1.6.0 Partition rebalancing for cluster data redistribution
179pub mod partition_rebalancer;
180
181// v1.7.0 Cluster node health monitoring with heartbeats
182pub mod node_monitor;
183
184// v1.8.0 Automatic failover handling with split-brain prevention
185pub mod failover_manager;
186
187/// Anti-entropy protocol for distributed consistency (v1.9.0).
188pub mod anti_entropy;
189
190/// Replication bandwidth throttling: token-bucket per-peer rate limiting with adaptive adjustment (v2.0.0).
191pub mod replication_throttle;
192
193/// Data migration between cluster nodes: plan creation, range-based transfer,
194/// checksum-validated chunks, migration lifecycle and statistics (v1.1.0 round 14)
195pub mod data_migrator;
196
197/// Consistent-hash shard routing for distributed cluster nodes (v1.1.0 round 15)
198pub mod shard_router;
199
200/// Raft-style election timer: randomised timeout, TimerState (Idle/Running/Expired),
201/// reset/check/stop lifecycle, LCG seed for deterministic tests (v1.1.0 round 16)
202pub mod election_timer;
203
204pub use error::{ClusterError, Result};
205pub use failover::{FailoverConfig, FailoverManager, FailoverStrategy, RecoveryAction};
206pub use health_monitor::{HealthMonitor, HealthMonitorConfig, NodeHealth, SystemMetrics};
207
208// Temporarily disabled - Re-export revolutionary cluster optimization types
209// pub use revolutionary_cluster_optimization::{
210//     RevolutionaryClusterOptimizer, RevolutionaryClusterConfig, ConsensusOptimizationConfig,
211//     DataDistributionConfig, AdaptiveReplicationConfig, NetworkOptimizationConfig,
212//     ClusterPerformanceTargets, ClusterOptimizationResult, ClusterState, NodeState,
213//     ClusterOptimizationContext, ClusterAnalytics, ScalingPrediction,
214//     RevolutionaryClusterOptimizerFactory, ConsensusOptimizationStrategy,
215//     DataDistributionStrategy, AdaptiveReplicationStrategy, NetworkOptimizationStrategy,
216// };
217
218use conflict_resolution::{
219    ConflictResolver, ResolutionStrategy, TimestampedOperation, VectorClock,
220};
221use consensus::ConsensusManager;
222use discovery::{DiscoveryConfig, DiscoveryService, NodeInfo};
223use distributed_query::{DistributedQueryExecutor, ResultBinding};
224use edge_computing::{EdgeComputingManager, EdgeDeploymentStrategy, EdgeDeviceProfile};
225use raft::{OxirsNodeId, RdfResponse};
226use region_manager::{
227    ConsensusStrategy as RegionConsensusStrategy, MultiRegionReplicationStrategy, Region,
228    RegionManager,
229};
230use replication::{ReplicationManager, ReplicationStats, ReplicationStrategy};
231
232/// Multi-region deployment configuration
233#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct MultiRegionConfig {
235    /// Region identifier where this node is located
236    pub region_id: String,
237    /// Availability zone identifier
238    pub availability_zone_id: String,
239    /// Data center identifier (optional)
240    pub data_center: Option<String>,
241    /// Rack identifier (optional)
242    pub rack: Option<String>,
243    /// List of all regions in the deployment
244    pub regions: Vec<Region>,
245    /// Consensus strategy for multi-region operations
246    pub consensus_strategy: RegionConsensusStrategy,
247    /// Replication strategy for multi-region
248    pub replication_strategy: MultiRegionReplicationStrategy,
249    /// Conflict resolution strategy for distributed operations
250    pub conflict_resolution_strategy: ResolutionStrategy,
251    /// Edge computing configuration
252    pub edge_config: Option<EdgeComputingConfig>,
253    /// Enable advanced monitoring and metrics
254    pub enable_monitoring: bool,
255}
256
257/// Edge computing configuration
258#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct EdgeComputingConfig {
260    /// Enable edge computing features
261    pub enabled: bool,
262    /// Local edge device profile
263    pub device_profile: EdgeDeviceProfile,
264    /// Edge deployment strategy
265    pub deployment_strategy: EdgeDeploymentStrategy,
266    /// Enable intelligent caching
267    pub enable_intelligent_caching: bool,
268    /// Enable network condition monitoring
269    pub enable_network_monitoring: bool,
270}
271
272/// Cluster node configuration
273#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct NodeConfig {
275    /// Unique node identifier
276    pub node_id: OxirsNodeId,
277    /// Network address for communication
278    pub address: SocketAddr,
279    /// Data directory for persistent storage
280    pub data_dir: String,
281    /// List of peer node IDs
282    pub peers: Vec<OxirsNodeId>,
283    /// Discovery configuration
284    pub discovery: Option<DiscoveryConfig>,
285    /// Replication strategy
286    pub replication_strategy: Option<ReplicationStrategy>,
287    /// Use Byzantine fault tolerance instead of Raft
288    #[cfg(feature = "bft")]
289    pub use_bft: bool,
290    /// Multi-region deployment configuration
291    pub region_config: Option<MultiRegionConfig>,
292}
293
294impl NodeConfig {
295    /// Create a new node configuration
296    pub fn new(node_id: OxirsNodeId, address: SocketAddr) -> Self {
297        Self {
298            node_id,
299            address,
300            data_dir: format!("./data/node-{node_id}"),
301            peers: Vec::new(),
302            discovery: Some(DiscoveryConfig::default()),
303            replication_strategy: Some(ReplicationStrategy::default()),
304            #[cfg(feature = "bft")]
305            use_bft: false,
306            region_config: None,
307        }
308    }
309
310    /// Add a peer to the configuration
311    pub fn add_peer(&mut self, peer_id: OxirsNodeId) -> &mut Self {
312        if !self.peers.contains(&peer_id) && peer_id != self.node_id {
313            self.peers.push(peer_id);
314        }
315        self
316    }
317
318    /// Set the discovery configuration
319    pub fn with_discovery(mut self, discovery: DiscoveryConfig) -> Self {
320        self.discovery = Some(discovery);
321        self
322    }
323
324    /// Set the replication strategy
325    pub fn with_replication_strategy(mut self, strategy: ReplicationStrategy) -> Self {
326        self.replication_strategy = Some(strategy);
327        self
328    }
329
330    /// Enable Byzantine fault tolerance
331    #[cfg(feature = "bft")]
332    pub fn with_bft(mut self, enable: bool) -> Self {
333        self.use_bft = enable;
334        self
335    }
336
337    /// Set multi-region configuration
338    pub fn with_multi_region(mut self, region_config: MultiRegionConfig) -> Self {
339        self.region_config = Some(region_config);
340        self
341    }
342
343    /// Check if multi-region is enabled
344    pub fn is_multi_region_enabled(&self) -> bool {
345        self.region_config.is_some()
346    }
347
348    /// Get region ID if configured
349    pub fn region_id(&self) -> Option<&str> {
350        self.region_config
351            .as_ref()
352            .map(|config| config.region_id.as_str())
353    }
354
355    /// Get availability zone ID if configured
356    pub fn availability_zone_id(&self) -> Option<&str> {
357        self.region_config
358            .as_ref()
359            .map(|config| config.availability_zone_id.as_str())
360    }
361}
362
363/// Cluster node implementation
364pub struct ClusterNode {
365    config: NodeConfig,
366    consensus: ConsensusManager,
367    discovery: DiscoveryService,
368    replication: ReplicationManager,
369    query_executor: DistributedQueryExecutor,
370    region_manager: Option<Arc<RegionManager>>,
371    conflict_resolver: Arc<ConflictResolver>,
372    #[allow(dead_code)]
373    edge_manager: Option<Arc<EdgeComputingManager>>,
374    local_vector_clock: Arc<RwLock<VectorClock>>,
375    running: Arc<RwLock<bool>>,
376    byzantine_mode: Arc<RwLock<bool>>,
377    network_isolated: Arc<RwLock<bool>>,
378}
379
380impl ClusterNode {
381    /// Create a new cluster node
382    pub async fn new(config: NodeConfig) -> Result<Self> {
383        // Validate configuration
384        if config.data_dir.is_empty() {
385            return Err(ClusterError::Config(
386                "Data directory cannot be empty".to_string(),
387            ));
388        }
389
390        // Create data directory if it doesn't exist
391        tokio::fs::create_dir_all(&config.data_dir)
392            .await
393            .map_err(|e| ClusterError::Other(format!("Failed to create data directory: {e}")))?;
394
395        // Initialize consensus manager
396        let consensus = ConsensusManager::new(config.node_id, config.peers.clone());
397
398        // Initialize discovery service
399        let discovery_config = config.discovery.clone().unwrap_or_default();
400        let discovery = DiscoveryService::new(config.node_id, config.address, discovery_config);
401
402        // Initialize replication manager
403        let replication_strategy = config.replication_strategy.clone().unwrap_or_default();
404        let replication = ReplicationManager::new(replication_strategy, config.node_id);
405
406        // Initialize distributed query executor
407        let query_executor = DistributedQueryExecutor::new(config.node_id);
408
409        // Initialize conflict resolver
410        let default_resolution_strategy = if let Some(region_config) = &config.region_config {
411            region_config.conflict_resolution_strategy.clone()
412        } else {
413            ResolutionStrategy::LastWriterWins
414        };
415        let conflict_resolver = Arc::new(ConflictResolver::new(default_resolution_strategy));
416
417        // Initialize vector clock
418        let mut vector_clock = VectorClock::new();
419        vector_clock.increment(config.node_id);
420        let local_vector_clock = Arc::new(RwLock::new(vector_clock));
421
422        // Initialize region manager if multi-region is configured
423        let region_manager = if let Some(region_config) = &config.region_config {
424            let manager = Arc::new(RegionManager::new(
425                region_config.region_id.clone(),
426                region_config.availability_zone_id.clone(),
427                region_config.consensus_strategy.clone(),
428                region_config.replication_strategy.clone(),
429            ));
430
431            // Initialize with region topology
432            manager
433                .initialize(region_config.regions.clone())
434                .await
435                .map_err(|e| {
436                    ClusterError::Other(format!("Failed to initialize region manager: {e}"))
437                })?;
438
439            // Register this node in the region manager
440            manager
441                .register_node(
442                    config.node_id,
443                    region_config.region_id.clone(),
444                    region_config.availability_zone_id.clone(),
445                    region_config.data_center.clone(),
446                    region_config.rack.clone(),
447                )
448                .await
449                .map_err(|e| {
450                    ClusterError::Other(format!("Failed to register node in region manager: {e}"))
451                })?;
452
453            Some(manager)
454        } else {
455            None
456        };
457
458        // Initialize edge computing manager if configured
459        let edge_manager = if let Some(region_config) = &config.region_config {
460            if let Some(edge_config) = &region_config.edge_config {
461                if edge_config.enabled {
462                    let manager = Arc::new(EdgeComputingManager::new());
463
464                    // Register this device with the edge manager
465                    manager
466                        .register_device(edge_config.device_profile.clone())
467                        .await
468                        .map_err(|e| {
469                            ClusterError::Other(format!("Failed to register edge device: {e}"))
470                        })?;
471
472                    Some(manager)
473                } else {
474                    None
475                }
476            } else {
477                None
478            }
479        } else {
480            None
481        };
482
483        Ok(Self {
484            config,
485            consensus,
486            discovery,
487            replication,
488            query_executor,
489            region_manager,
490            conflict_resolver,
491            edge_manager,
492            local_vector_clock,
493            running: Arc::new(RwLock::new(false)),
494            byzantine_mode: Arc::new(RwLock::new(false)),
495            network_isolated: Arc::new(RwLock::new(false)),
496        })
497    }
498
499    /// Start the cluster node
500    pub async fn start(&mut self) -> Result<()> {
501        {
502            let mut running = self.running.write().await;
503            if *running {
504                return Ok(());
505            }
506            *running = true;
507        }
508
509        tracing::info!(
510            "Starting cluster node {} at {} with {} peers",
511            self.config.node_id,
512            self.config.address,
513            self.config.peers.len()
514        );
515
516        // Start discovery service
517        self.discovery
518            .start()
519            .await
520            .map_err(|e| ClusterError::Other(format!("Failed to start discovery service: {e}")))?;
521
522        // Discover initial nodes
523        let discovered_nodes = self
524            .discovery
525            .discover_nodes()
526            .await
527            .map_err(|e| ClusterError::Other(format!("Failed to discover nodes: {e}")))?;
528
529        // Add discovered nodes to replication manager and query executor
530        for node in discovered_nodes {
531            if node.node_id != self.config.node_id {
532                self.replication
533                    .add_replica(node.node_id, node.address.to_string());
534                self.query_executor.add_node(node.node_id).await;
535            }
536        }
537
538        // Initialize consensus system
539        self.consensus
540            .init()
541            .await
542            .map_err(|e| ClusterError::Other(format!("Failed to initialize consensus: {e}")))?;
543
544        tracing::info!("Cluster node {} started successfully", self.config.node_id);
545
546        // Start background tasks
547        self.start_background_tasks().await;
548
549        Ok(())
550    }
551
552    /// Stop the cluster node
553    pub async fn stop(&mut self) -> Result<()> {
554        let mut running = self.running.write().await;
555        if !*running {
556            return Ok(());
557        }
558
559        tracing::info!("Stopping cluster node {}", self.config.node_id);
560
561        // Stop discovery service
562        self.discovery
563            .stop()
564            .await
565            .map_err(|e| ClusterError::Other(format!("Failed to stop discovery service: {e}")))?;
566
567        *running = false;
568
569        tracing::info!("Cluster node {} stopped", self.config.node_id);
570
571        Ok(())
572    }
573
574    /// Check if this node is the leader
575    pub async fn is_leader(&self) -> bool {
576        self.consensus.is_leader().await
577    }
578
579    /// Get current consensus term
580    pub async fn current_term(&self) -> u64 {
581        self.consensus.current_term().await
582    }
583
584    /// Insert a triple through distributed consensus
585    pub async fn insert_triple(
586        &self,
587        subject: &str,
588        predicate: &str,
589        object: &str,
590    ) -> Result<RdfResponse> {
591        if !self.is_leader().await {
592            return Err(ClusterError::NotLeader);
593        }
594
595        let response = self
596            .consensus
597            .insert_triple(
598                subject.to_string(),
599                predicate.to_string(),
600                object.to_string(),
601            )
602            .await?;
603
604        Ok(response)
605    }
606
607    /// Delete a triple through distributed consensus
608    pub async fn delete_triple(
609        &self,
610        subject: &str,
611        predicate: &str,
612        object: &str,
613    ) -> Result<RdfResponse> {
614        if !self.is_leader().await {
615            return Err(ClusterError::NotLeader);
616        }
617
618        let response = self
619            .consensus
620            .delete_triple(
621                subject.to_string(),
622                predicate.to_string(),
623                object.to_string(),
624            )
625            .await?;
626
627        Ok(response)
628    }
629
630    /// Clear all triples through distributed consensus
631    pub async fn clear_store(&self) -> Result<RdfResponse> {
632        if !self.is_leader().await {
633            return Err(ClusterError::NotLeader);
634        }
635
636        let response = self.consensus.clear_store().await?;
637        Ok(response)
638    }
639
640    /// Begin a distributed transaction
641    pub async fn begin_transaction(&self) -> Result<String> {
642        if !self.is_leader().await {
643            return Err(ClusterError::NotLeader);
644        }
645
646        let tx_id = uuid::Uuid::new_v4().to_string();
647        let _response = self.consensus.begin_transaction(tx_id.clone()).await?;
648
649        Ok(tx_id)
650    }
651
652    /// Commit a distributed transaction
653    pub async fn commit_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
654        if !self.is_leader().await {
655            return Err(ClusterError::NotLeader);
656        }
657
658        let response = self.consensus.commit_transaction(tx_id.to_string()).await?;
659        Ok(response)
660    }
661
662    /// Rollback a distributed transaction
663    pub async fn rollback_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
664        if !self.is_leader().await {
665            return Err(ClusterError::NotLeader);
666        }
667
668        let response = self
669            .consensus
670            .rollback_transaction(tx_id.to_string())
671            .await?;
672        Ok(response)
673    }
674
675    /// Query triples (can be done on any node)
676    pub async fn query_triples(
677        &self,
678        subject: Option<&str>,
679        predicate: Option<&str>,
680        object: Option<&str>,
681    ) -> Vec<(String, String, String)> {
682        self.consensus.query(subject, predicate, object).await
683    }
684
685    /// Execute SPARQL query using distributed query processing
686    pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
687        let bindings = self
688            .query_executor
689            .execute_query(sparql)
690            .await
691            .map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))?;
692
693        // Convert result bindings to string format
694        let results = bindings
695            .into_iter()
696            .map(|binding| {
697                let vars: Vec<String> = binding
698                    .variables
699                    .into_iter()
700                    .map(|(var, val)| format!("{var}: {val}"))
701                    .collect();
702                vars.join(", ")
703            })
704            .collect();
705
706        Ok(results)
707    }
708
709    /// Execute SPARQL query and return structured results
710    pub async fn query_sparql_bindings(&self, sparql: &str) -> Result<Vec<ResultBinding>> {
711        self.query_executor
712            .execute_query(sparql)
713            .await
714            .map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))
715    }
716
717    /// Get query execution statistics
718    pub async fn get_query_statistics(
719        &self,
720    ) -> Result<std::collections::HashMap<String, distributed_query::QueryStats>> {
721        Ok(self.query_executor.get_statistics().await)
722    }
723
724    /// Clear query cache
725    pub async fn clear_query_cache(&self) -> Result<()> {
726        self.query_executor.clear_cache().await;
727        Ok(())
728    }
729
730    /// Get the number of triples in the store
731    pub async fn len(&self) -> usize {
732        self.consensus.len().await
733    }
734
735    /// Check if the store is empty
736    pub async fn is_empty(&self) -> bool {
737        self.consensus.is_empty().await
738    }
739
740    /// Add a new node to the cluster
741    pub async fn add_cluster_node(
742        &mut self,
743        node_id: OxirsNodeId,
744        address: SocketAddr,
745    ) -> Result<()> {
746        if node_id == self.config.node_id {
747            return Err(ClusterError::Config(
748                "Cannot add self to cluster".to_string(),
749            ));
750        }
751
752        // Add to configuration
753        self.config.add_peer(node_id);
754
755        // Add to discovery
756        let node_info = NodeInfo::new(node_id, address);
757        self.discovery.add_node(node_info);
758
759        // Add to replication
760        self.replication.add_replica(node_id, address.to_string());
761
762        // Add to query executor
763        self.query_executor.add_node(node_id).await;
764
765        // Add to consensus (this would trigger Raft membership change)
766        self.consensus.add_peer(node_id);
767
768        tracing::info!("Added node {} at {} to cluster", node_id, address);
769
770        Ok(())
771    }
772
773    /// Remove a node from the cluster
774    pub async fn remove_cluster_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
775        if node_id == self.config.node_id {
776            return Err(ClusterError::Config(
777                "Cannot remove self from cluster".to_string(),
778            ));
779        }
780
781        // Remove from configuration
782        self.config.peers.retain(|&id| id != node_id);
783
784        // Remove from discovery
785        self.discovery.remove_node(node_id);
786
787        // Remove from replication
788        self.replication.remove_replica(node_id);
789
790        // Remove from query executor
791        self.query_executor.remove_node(node_id).await;
792
793        // Remove from consensus (this would trigger Raft membership change)
794        self.consensus.remove_peer(node_id);
795
796        tracing::info!("Removed node {} from cluster", node_id);
797
798        Ok(())
799    }
800
801    /// Get comprehensive cluster status
802    pub async fn get_status(&self) -> ClusterStatus {
803        let consensus_status = self.consensus.get_status().await;
804        let discovery_stats = self.discovery.get_stats().clone();
805        let replication_stats = self.replication.get_stats().clone();
806
807        // Get region status if multi-region is enabled
808        let region_status = if let Some(region_manager) = &self.region_manager {
809            let region_id = region_manager.get_local_region().to_string();
810            let availability_zone_id = region_manager.get_local_availability_zone().to_string();
811            let regional_peers = region_manager.get_nodes_in_region(&region_id).await;
812            let topology = region_manager.get_topology().await;
813            let monitoring_active = region_manager.is_monitoring_active().await;
814
815            Some(RegionStatus {
816                region_id,
817                availability_zone_id,
818                regional_peer_count: regional_peers.len(),
819                total_regions: topology.regions.len(),
820                monitoring_active,
821            })
822        } else {
823            None
824        };
825
826        ClusterStatus {
827            node_id: self.config.node_id,
828            address: self.config.address,
829            is_leader: consensus_status.is_leader,
830            current_term: consensus_status.current_term,
831            peer_count: consensus_status.peer_count,
832            triple_count: consensus_status.triple_count,
833            discovery_stats,
834            replication_stats,
835            is_running: *self.running.read().await,
836            region_status,
837        }
838    }
839
840    /// Start background maintenance tasks
841    async fn start_background_tasks(&mut self) {
842        let running = Arc::clone(&self.running);
843
844        // Discovery and health check task
845        let discovery_config = self.config.discovery.clone().unwrap_or_default();
846        let mut discovery_clone =
847            DiscoveryService::new(self.config.node_id, self.config.address, discovery_config);
848
849        tokio::spawn(async move {
850            while *running.read().await {
851                discovery_clone.run_periodic_tasks().await;
852                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
853            }
854        });
855
856        // Replication maintenance task
857        let mut replication_clone = ReplicationManager::with_raft_consensus(self.config.node_id);
858        let running_clone = Arc::clone(&self.running);
859
860        tokio::spawn(async move {
861            if *running_clone.read().await {
862                replication_clone.run_maintenance().await; // run_maintenance() is infinite loop
863            }
864        });
865    }
866
867    /// Add a new node to the cluster using consensus protocol
868    pub async fn add_node_with_consensus(
869        &mut self,
870        node_id: OxirsNodeId,
871        address: SocketAddr,
872    ) -> Result<()> {
873        self.consensus
874            .add_node_with_consensus(node_id, address.to_string())
875            .await
876            .map_err(|e| {
877                ClusterError::Other(format!("Failed to add node through consensus: {e}"))
878            })?;
879
880        // Update local configuration
881        self.config.add_peer(node_id);
882
883        // Add to discovery, replication, and query executor
884        let node_info = NodeInfo::new(node_id, address);
885        self.discovery.add_node(node_info);
886        self.replication.add_replica(node_id, address.to_string());
887        self.query_executor.add_node(node_id).await;
888
889        Ok(())
890    }
891
892    /// Remove a node from the cluster using consensus protocol
893    pub async fn remove_node_with_consensus(&mut self, node_id: OxirsNodeId) -> Result<()> {
894        self.consensus
895            .remove_node_with_consensus(node_id)
896            .await
897            .map_err(|e| {
898                ClusterError::Other(format!("Failed to remove node through consensus: {e}"))
899            })?;
900
901        // Update local configuration
902        self.config.peers.retain(|&id| id != node_id);
903
904        // Remove from discovery, replication, and query executor
905        self.discovery.remove_node(node_id);
906        self.replication.remove_replica(node_id);
907        self.query_executor.remove_node(node_id).await;
908
909        Ok(())
910    }
911
912    /// Gracefully shutdown this node
913    pub async fn graceful_shutdown(&mut self) -> Result<()> {
914        tracing::info!(
915            "Initiating graceful shutdown of cluster node {}",
916            self.config.node_id
917        );
918
919        // Stop background tasks first
920        {
921            let mut running = self.running.write().await;
922            *running = false;
923        }
924
925        // Gracefully shutdown consensus layer (includes leadership transfer if needed)
926        self.consensus
927            .graceful_shutdown()
928            .await
929            .map_err(|e| ClusterError::Other(format!("Failed to shutdown consensus: {e}")))?;
930
931        // Stop discovery and replication services
932        self.discovery
933            .stop()
934            .await
935            .map_err(|e| ClusterError::Other(format!("Failed to stop discovery: {e}")))?;
936
937        tracing::info!("Cluster node {} gracefully shutdown", self.config.node_id);
938        Ok(())
939    }
940
941    /// Transfer leadership to another node
942    pub async fn transfer_leadership(&mut self, target_node: OxirsNodeId) -> Result<()> {
943        if !self.config.peers.contains(&target_node) {
944            return Err(ClusterError::Config(format!(
945                "Target node {target_node} not in cluster"
946            )));
947        }
948
949        self.consensus
950            .transfer_leadership(target_node)
951            .await
952            .map_err(|e| ClusterError::Other(format!("Failed to transfer leadership: {e}")))?;
953
954        Ok(())
955    }
956
957    /// Force evict a non-responsive node
958    pub async fn force_evict_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
959        self.consensus
960            .force_evict_node(node_id)
961            .await
962            .map_err(|e| ClusterError::Other(format!("Failed to force evict node: {e}")))?;
963
964        // Update local configuration
965        self.config.peers.retain(|&id| id != node_id);
966        self.discovery.remove_node(node_id);
967        self.replication.remove_replica(node_id);
968        self.query_executor.remove_node(node_id).await;
969
970        Ok(())
971    }
972
973    /// Check health of all peer nodes
974    pub async fn check_cluster_health(&self) -> Result<Vec<consensus::NodeHealthStatus>> {
975        self.consensus
976            .check_peer_health()
977            .await
978            .map_err(|e| ClusterError::Other(format!("Failed to check cluster health: {e}")))
979    }
980
981    /// Attempt recovery from partition or failure
982    pub async fn attempt_recovery(&mut self) -> Result<()> {
983        self.consensus
984            .attempt_recovery()
985            .await
986            .map_err(|e| ClusterError::Other(format!("Failed to recover cluster: {e}")))?;
987
988        tracing::info!(
989            "Cluster recovery completed for node {}",
990            self.config.node_id
991        );
992        Ok(())
993    }
994
995    /// Get the node ID
996    pub fn id(&self) -> OxirsNodeId {
997        self.config.node_id
998    }
999
1000    /// Count triples in the store
1001    pub async fn count_triples(&self) -> Result<usize> {
1002        Ok(self.len().await)
1003    }
1004
1005    /// Check if the node is active (running and not isolated)
1006    pub async fn is_active(&self) -> Result<bool> {
1007        Ok(*self.running.read().await && !*self.network_isolated.read().await)
1008    }
1009
1010    /// Isolate the node from network (simulate network partition)
1011    pub async fn isolate_network(&self) -> Result<()> {
1012        let mut isolated = self.network_isolated.write().await;
1013        *isolated = true;
1014        tracing::info!("Node {} network isolated", self.config.node_id);
1015        Ok(())
1016    }
1017
1018    /// Restore network connectivity
1019    pub async fn restore_network(&self) -> Result<()> {
1020        let mut isolated = self.network_isolated.write().await;
1021        *isolated = false;
1022        tracing::info!("Node {} network restored", self.config.node_id);
1023        Ok(())
1024    }
1025
1026    /// Enable Byzantine behavior (for testing)
1027    pub async fn enable_byzantine_mode(&self) -> Result<()> {
1028        let mut byzantine = self.byzantine_mode.write().await;
1029        *byzantine = true;
1030        tracing::info!("Node {} Byzantine mode enabled", self.config.node_id);
1031        Ok(())
1032    }
1033
1034    /// Check if node is in Byzantine mode
1035    pub async fn is_byzantine(&self) -> Result<bool> {
1036        Ok(*self.byzantine_mode.read().await)
1037    }
1038
1039    /// Get multi-region manager (if configured)
1040    pub fn region_manager(&self) -> Option<&Arc<RegionManager>> {
1041        self.region_manager.as_ref()
1042    }
1043
1044    /// Check if multi-region deployment is enabled
1045    pub fn is_multi_region_enabled(&self) -> bool {
1046        self.region_manager.is_some()
1047    }
1048
1049    /// Get current node's region ID
1050    pub fn get_region_id(&self) -> Option<String> {
1051        self.region_manager
1052            .as_ref()
1053            .map(|rm| rm.get_local_region().to_string())
1054    }
1055
1056    /// Get current node's availability zone ID
1057    pub fn get_availability_zone_id(&self) -> Option<String> {
1058        self.region_manager
1059            .as_ref()
1060            .map(|rm| rm.get_local_availability_zone().to_string())
1061    }
1062
1063    /// Get nodes in the same region
1064    pub async fn get_regional_peers(&self) -> Result<Vec<OxirsNodeId>> {
1065        if let Some(region_manager) = &self.region_manager {
1066            let region_id = region_manager.get_local_region();
1067            Ok(region_manager.get_nodes_in_region(region_id).await)
1068        } else {
1069            Err(ClusterError::Config(
1070                "Multi-region not configured".to_string(),
1071            ))
1072        }
1073    }
1074
1075    /// Get optimal leader candidates considering region affinity
1076    pub async fn get_regional_leader_candidates(&self) -> Result<Vec<OxirsNodeId>> {
1077        if let Some(region_manager) = &self.region_manager {
1078            let region_id = region_manager.get_local_region();
1079            Ok(region_manager.get_leader_candidates(region_id).await)
1080        } else {
1081            // Fall back to regular peer list
1082            Ok(self.config.peers.clone())
1083        }
1084    }
1085
1086    /// Calculate cross-region replication targets
1087    pub async fn get_cross_region_replication_targets(&self) -> Result<Vec<String>> {
1088        if let Some(region_manager) = &self.region_manager {
1089            let region_id = region_manager.get_local_region();
1090            region_manager
1091                .calculate_replication_targets(region_id)
1092                .await
1093                .map_err(|e| {
1094                    ClusterError::Other(format!("Failed to calculate replication targets: {e}"))
1095                })
1096        } else {
1097            Ok(Vec::new())
1098        }
1099    }
1100
1101    /// Monitor inter-region latencies and update metrics
1102    pub async fn monitor_region_latencies(&self) -> Result<()> {
1103        if let Some(region_manager) = &self.region_manager {
1104            region_manager.monitor_latencies().await.map_err(|e| {
1105                ClusterError::Other(format!("Failed to monitor region latencies: {e}"))
1106            })
1107        } else {
1108            Ok(())
1109        }
1110    }
1111
1112    /// Get region health status
1113    pub async fn get_region_health(&self, region_id: &str) -> Result<region_manager::RegionHealth> {
1114        if let Some(region_manager) = &self.region_manager {
1115            region_manager
1116                .get_region_health(region_id)
1117                .await
1118                .map_err(|e| ClusterError::Other(format!("Failed to get region health: {e}")))
1119        } else {
1120            Err(ClusterError::Config(
1121                "Multi-region not configured".to_string(),
1122            ))
1123        }
1124    }
1125
1126    /// Perform region failover operation
1127    pub async fn perform_region_failover(
1128        &self,
1129        failed_region: &str,
1130        target_region: &str,
1131    ) -> Result<()> {
1132        if let Some(region_manager) = &self.region_manager {
1133            region_manager
1134                .perform_region_failover(failed_region, target_region)
1135                .await
1136                .map_err(|e| ClusterError::Other(format!("Failed to perform region failover: {e}")))
1137        } else {
1138            Err(ClusterError::Config(
1139                "Multi-region not configured".to_string(),
1140            ))
1141        }
1142    }
1143
1144    /// Get multi-region topology information
1145    pub async fn get_region_topology(&self) -> Result<region_manager::RegionTopology> {
1146        if let Some(region_manager) = &self.region_manager {
1147            Ok(region_manager.get_topology().await)
1148        } else {
1149            Err(ClusterError::Config(
1150                "Multi-region not configured".to_string(),
1151            ))
1152        }
1153    }
1154
1155    /// Add a node to a specific region and availability zone
1156    pub async fn add_node_to_region(
1157        &self,
1158        node_id: OxirsNodeId,
1159        region_id: String,
1160        availability_zone_id: String,
1161        data_center: Option<String>,
1162        rack: Option<String>,
1163    ) -> Result<()> {
1164        if let Some(region_manager) = &self.region_manager {
1165            region_manager
1166                .register_node(node_id, region_id, availability_zone_id, data_center, rack)
1167                .await
1168                .map_err(|e| ClusterError::Other(format!("Failed to add node to region: {e}")))
1169        } else {
1170            Err(ClusterError::Config(
1171                "Multi-region not configured".to_string(),
1172            ))
1173        }
1174    }
1175
1176    /// Get conflict resolver instance
1177    pub fn conflict_resolver(&self) -> &Arc<ConflictResolver> {
1178        &self.conflict_resolver
1179    }
1180
1181    /// Get current vector clock value
1182    pub async fn get_vector_clock(&self) -> VectorClock {
1183        self.local_vector_clock.read().await.clone()
1184    }
1185
1186    /// Update vector clock with received clock
1187    pub async fn update_vector_clock(&self, received_clock: &VectorClock) {
1188        let mut clock = self.local_vector_clock.write().await;
1189        clock.update(received_clock);
1190        clock.increment(self.config.node_id);
1191    }
1192
1193    /// Create a timestamped operation with current vector clock
1194    pub async fn create_timestamped_operation(
1195        &self,
1196        operation: conflict_resolution::RdfOperation,
1197        priority: u32,
1198    ) -> TimestampedOperation {
1199        let mut clock = self.local_vector_clock.write().await;
1200        clock.increment(self.config.node_id);
1201
1202        TimestampedOperation {
1203            operation_id: uuid::Uuid::new_v4().to_string(),
1204            origin_node: self.config.node_id,
1205            vector_clock: clock.clone(),
1206            physical_time: std::time::SystemTime::now(),
1207            operation,
1208            priority,
1209        }
1210    }
1211
1212    /// Detect conflicts in a batch of operations
1213    pub async fn detect_operation_conflicts(
1214        &self,
1215        operations: &[TimestampedOperation],
1216    ) -> Result<Vec<conflict_resolution::ConflictType>> {
1217        self.conflict_resolver
1218            .detect_conflicts(operations)
1219            .await
1220            .map_err(|e| ClusterError::Other(format!("Failed to detect conflicts: {e}")))
1221    }
1222
1223    /// Resolve conflicts using configured strategies
1224    pub async fn resolve_operation_conflicts(
1225        &self,
1226        conflicts: &[conflict_resolution::ConflictType],
1227    ) -> Result<Vec<conflict_resolution::ResolutionResult>> {
1228        self.conflict_resolver
1229            .resolve_conflicts(conflicts)
1230            .await
1231            .map_err(|e| ClusterError::Other(format!("Failed to resolve conflicts: {e}")))
1232    }
1233
1234    /// Submit an operation for conflict-aware processing
1235    pub async fn submit_conflict_aware_operation(
1236        &self,
1237        operation: conflict_resolution::RdfOperation,
1238        priority: u32,
1239    ) -> Result<RdfResponse> {
1240        // Create timestamped operation
1241        let _timestamped_op = self
1242            .create_timestamped_operation(operation.clone(), priority)
1243            .await;
1244
1245        // For now, submit to consensus without conflict detection
1246        // In a full implementation, this would be integrated with the consensus layer
1247        match operation {
1248            conflict_resolution::RdfOperation::Insert {
1249                subject,
1250                predicate,
1251                object,
1252                ..
1253            } => self.insert_triple(&subject, &predicate, &object).await,
1254            conflict_resolution::RdfOperation::Delete {
1255                subject,
1256                predicate,
1257                object,
1258                ..
1259            } => self.delete_triple(&subject, &predicate, &object).await,
1260            conflict_resolution::RdfOperation::Clear { .. } => self.clear_store().await,
1261            conflict_resolution::RdfOperation::Update {
1262                old_triple,
1263                new_triple,
1264                ..
1265            } => {
1266                // Implement as delete + insert
1267                let _delete_result = self
1268                    .delete_triple(&old_triple.0, &old_triple.1, &old_triple.2)
1269                    .await?;
1270                self.insert_triple(&new_triple.0, &new_triple.1, &new_triple.2)
1271                    .await
1272            }
1273            conflict_resolution::RdfOperation::Batch { operations: _ } => {
1274                // Process batch operations sequentially
1275                // Note: This is a simplified implementation that doesn't use recursion
1276                // In a full implementation, each operation would be processed individually
1277                // For now, just return success for batch operations
1278                Ok(RdfResponse::Success)
1279            }
1280        }
1281    }
1282
1283    /// Get conflict resolution statistics
1284    pub async fn get_conflict_resolution_statistics(
1285        &self,
1286    ) -> conflict_resolution::ResolutionStatistics {
1287        self.conflict_resolver.get_statistics().await
1288    }
1289}
1290
1291/// Comprehensive cluster status information
1292#[derive(Debug, Clone)]
1293pub struct ClusterStatus {
1294    /// Local node ID
1295    pub node_id: OxirsNodeId,
1296    /// Local node address
1297    pub address: SocketAddr,
1298    /// Whether this node is the current leader
1299    pub is_leader: bool,
1300    /// Current Raft term
1301    pub current_term: u64,
1302    /// Number of peer nodes
1303    pub peer_count: usize,
1304    /// Number of triples in the store
1305    pub triple_count: usize,
1306    /// Discovery service statistics
1307    pub discovery_stats: discovery::DiscoveryStats,
1308    /// Replication statistics
1309    pub replication_stats: ReplicationStats,
1310    /// Whether the node is currently running
1311    pub is_running: bool,
1312    /// Multi-region status (if enabled)
1313    pub region_status: Option<RegionStatus>,
1314}
1315
1316/// Multi-region status information
1317#[derive(Debug, Clone)]
1318pub struct RegionStatus {
1319    /// Current region ID
1320    pub region_id: String,
1321    /// Current availability zone ID
1322    pub availability_zone_id: String,
1323    /// Number of nodes in the same region
1324    pub regional_peer_count: usize,
1325    /// Total number of regions in topology
1326    pub total_regions: usize,
1327    /// Whether multi-region monitoring is active
1328    pub monitoring_active: bool,
1329}
1330
1331/// Distributed RDF store (simplified interface)
1332pub struct DistributedStore {
1333    node: ClusterNode,
1334}
1335
1336impl DistributedStore {
1337    /// Create a new distributed store
1338    pub async fn new(config: NodeConfig) -> Result<Self> {
1339        let node = ClusterNode::new(config).await?;
1340        Ok(Self { node })
1341    }
1342
1343    /// Start the distributed store
1344    pub async fn start(&mut self) -> Result<()> {
1345        self.node.start().await
1346    }
1347
1348    /// Stop the distributed store
1349    pub async fn stop(&mut self) -> Result<()> {
1350        self.node.stop().await
1351    }
1352
1353    /// Insert a triple (only on leader)
1354    pub async fn insert_triple(
1355        &mut self,
1356        subject: &str,
1357        predicate: &str,
1358        object: &str,
1359    ) -> Result<()> {
1360        let _response = self.node.insert_triple(subject, predicate, object).await?;
1361        Ok(())
1362    }
1363
1364    /// Query triples using SPARQL
1365    pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
1366        self.node.query_sparql(sparql).await
1367    }
1368
1369    /// Query triples by pattern
1370    pub async fn query_pattern(
1371        &self,
1372        subject: Option<&str>,
1373        predicate: Option<&str>,
1374        object: Option<&str>,
1375    ) -> Vec<(String, String, String)> {
1376        self.node.query_triples(subject, predicate, object).await
1377    }
1378
1379    /// Get cluster status
1380    pub async fn get_status(&self) -> ClusterStatus {
1381        self.node.get_status().await
1382    }
1383}
1384
1385/// Re-export commonly used types
1386pub use consensus::ConsensusError;
1387pub use discovery::DiscoveryError;
1388pub use replication::ReplicationError;
1389
1390#[cfg(test)]
1391mod tests {
1392    use super::*;
1393    use std::net::{IpAddr, Ipv4Addr};
1394
1395    #[tokio::test]
1396    async fn test_node_config_creation() {
1397        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1398        let config = NodeConfig::new(1, addr);
1399
1400        assert_eq!(config.node_id, 1);
1401        assert_eq!(config.address, addr);
1402        assert_eq!(config.data_dir, "./data/node-1");
1403        assert!(config.peers.is_empty());
1404        assert!(config.discovery.is_some());
1405        assert!(config.replication_strategy.is_some());
1406        assert!(config.region_config.is_none());
1407    }
1408
1409    #[tokio::test]
1410    async fn test_node_config_add_peer() {
1411        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1412        let mut config = NodeConfig::new(1, addr);
1413
1414        config.add_peer(2);
1415        config.add_peer(3);
1416        config.add_peer(2); // Duplicate should be ignored
1417
1418        assert_eq!(config.peers, vec![2, 3]);
1419    }
1420
1421    #[tokio::test]
1422    async fn test_node_config_no_self_peer() {
1423        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1424        let mut config = NodeConfig::new(1, addr);
1425
1426        config.add_peer(1); // Should not add self
1427
1428        assert!(config.peers.is_empty());
1429    }
1430
1431    #[tokio::test]
1432    async fn test_cluster_node_creation() {
1433        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1434        let config = NodeConfig::new(1, addr);
1435
1436        let node = ClusterNode::new(config).await;
1437        assert!(node.is_ok());
1438
1439        let node = node.unwrap();
1440        assert_eq!(node.config.node_id, 1);
1441        assert_eq!(node.config.address, addr);
1442    }
1443
1444    #[tokio::test]
1445    async fn test_cluster_node_empty_data_dir_error() {
1446        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1447        let mut config = NodeConfig::new(1, addr);
1448        config.data_dir = String::new();
1449
1450        let result = ClusterNode::new(config).await;
1451        assert!(result.is_err());
1452        if let Err(e) = result {
1453            assert!(e.to_string().contains("Data directory cannot be empty"));
1454        }
1455    }
1456
1457    #[tokio::test]
1458    async fn test_distributed_store_creation() {
1459        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1460        let config = NodeConfig::new(1, addr);
1461
1462        let store = DistributedStore::new(config).await;
1463        assert!(store.is_ok());
1464    }
1465
1466    #[test]
1467    fn test_cluster_error_types() {
1468        let err = ClusterError::Config("test error".to_string());
1469        assert!(err.to_string().contains("Configuration error: test error"));
1470
1471        let err = ClusterError::NotLeader;
1472        assert_eq!(err.to_string(), "Not the leader node");
1473
1474        let err = ClusterError::Network("connection failed".to_string());
1475        assert!(err.to_string().contains("Network error: connection failed"));
1476    }
1477}