oxirs_cluster/
lib.rs

1//! # OxiRS Cluster
2//!
3//! [![Version](https://img.shields.io/badge/version-0.1.0--beta.1-blue)](https://github.com/cool-japan/oxirs/releases)
4//! [![docs.rs](https://docs.rs/oxirs-cluster/badge.svg)](https://docs.rs/oxirs-cluster)
5//!
6//! **Status**: Beta Release (v0.1.0-beta.1)
7//! **Stability**: Public APIs are stable. Production-ready with comprehensive testing.
8//!
9//! Raft-backed distributed dataset for high availability and horizontal scaling.
10//!
11//! This crate provides distributed storage capabilities using Raft consensus with
12//! multi-region support, Byzantine fault tolerance, and advanced replication strategies.
13//!
14//! ## Features
15//!
16//! - **Raft Consensus**: Production-ready Raft implementation using openraft
17//! - **Distributed RDF Storage**: Scalable, consistent RDF triple storage
18//! - **Automatic Failover**: Leader election and automatic recovery
19//! - **Node Discovery**: Multiple discovery mechanisms (static, DNS, multicast)
20//! - **Replication Management**: Configurable replication strategies
21//! - **SPARQL Support**: Distributed SPARQL query execution
22//! - **Transaction Support**: Distributed ACID transactions
23//!
24//! ## Example
25//!
26//! ```rust,no_run
27//! use oxirs_cluster::{ClusterNode, NodeConfig};
28//! use std::net::SocketAddr;
29//!
30//! # async fn example() -> anyhow::Result<()> {
31//! let config = NodeConfig {
32//!     node_id: 1,
33//!     address: "127.0.0.1:8080".parse()?,
34//!     data_dir: "./data".to_string(),
35//!     peers: vec![2, 3],
36//!     discovery: None,
37//!     replication_strategy: None,
38//!     region_config: None,
39//!     use_bft: false,
40//! };
41//!
42//! let mut node = ClusterNode::new(config).await?;
43//! node.start().await?;
44//!
45//! // Insert data through consensus
46//! node.insert_triple(
47//!     "<http://example.org/subject>",
48//!     "<http://example.org/predicate>",
49//!     "\"object\"")
50//! .await?;
51//! # Ok(())
52//! # }
53//! ```
54
55#![allow(clippy::field_reassign_with_default)]
56#![allow(clippy::single_match)]
57#![allow(clippy::collapsible_if)]
58#![allow(clippy::clone_on_copy)]
59#![allow(clippy::type_complexity)]
60#![allow(clippy::collapsible_match)]
61#![allow(clippy::manual_clamp)]
62#![allow(clippy::needless_range_loop)]
63#![allow(clippy::or_fun_call)]
64#![allow(clippy::if_same_then_else)]
65#![allow(clippy::only_used_in_recursion)]
66#![allow(clippy::new_without_default)]
67#![allow(clippy::derivable_impls)]
68#![allow(clippy::useless_conversion)]
69use serde::{Deserialize, Serialize};
70use std::net::SocketAddr;
71use std::sync::Arc;
72use tokio::sync::RwLock;
73
74pub mod adaptive_leader_election;
75pub mod advanced_partitioning;
76pub mod advanced_storage;
77pub mod alerting;
78pub mod auto_scaling;
79pub mod backup_restore;
80pub mod circuit_breaker;
81pub mod conflict_resolution;
82pub mod consensus;
83pub mod crash_recovery;
84pub mod data_rebalancing;
85pub mod disaster_recovery;
86pub mod discovery;
87pub mod distributed_query;
88pub mod distributed_tracing;
89pub mod edge_computing;
90pub mod enhanced_node_discovery;
91pub mod enhanced_snapshotting;
92pub mod error;
93pub mod failover;
94pub mod federation;
95pub mod health_monitor;
96pub mod health_monitoring;
97pub mod memory_optimization;
98pub mod merkle_tree;
99pub mod mvcc;
100pub mod mvcc_storage;
101pub mod network;
102pub mod node_lifecycle;
103pub mod node_status_tracker;
104pub mod operational_transformation;
105pub mod optimization;
106pub mod partition_detection;
107pub mod performance_metrics;
108pub mod performance_monitor;
109pub mod raft;
110pub mod raft_optimization;
111pub mod raft_profiling;
112pub mod raft_state;
113pub mod range_partitioning;
114pub mod read_replica;
115pub mod region_manager;
116pub mod replication;
117pub mod replication_lag_monitor;
118pub mod rolling_upgrade;
119pub mod visualization_dashboard;
120pub mod zero_downtime_migration;
121// Temporarily disabled due to missing scirs2_core features
122// pub mod revolutionary_cluster_optimization;
123pub mod security;
124pub mod serialization;
125pub mod shard;
126pub mod shard_manager;
127pub mod shard_migration;
128pub mod shard_routing;
129pub mod split_brain_prevention;
130pub mod storage;
131pub mod strong_consistency;
132pub mod tls;
133pub mod transaction;
134pub mod transaction_optimizer;
135
136#[cfg(feature = "bft")]
137pub mod bft;
138#[cfg(feature = "bft")]
139pub mod bft_consensus;
140#[cfg(feature = "bft")]
141pub mod bft_network;
142
143pub use error::{ClusterError, Result};
144pub use failover::{FailoverConfig, FailoverManager, FailoverStrategy, RecoveryAction};
145pub use health_monitor::{HealthMonitor, HealthMonitorConfig, NodeHealth, SystemMetrics};
146
147// Temporarily disabled - Re-export revolutionary cluster optimization types
148// pub use revolutionary_cluster_optimization::{
149//     RevolutionaryClusterOptimizer, RevolutionaryClusterConfig, ConsensusOptimizationConfig,
150//     DataDistributionConfig, AdaptiveReplicationConfig, NetworkOptimizationConfig,
151//     ClusterPerformanceTargets, ClusterOptimizationResult, ClusterState, NodeState,
152//     ClusterOptimizationContext, ClusterAnalytics, ScalingPrediction,
153//     RevolutionaryClusterOptimizerFactory, ConsensusOptimizationStrategy,
154//     DataDistributionStrategy, AdaptiveReplicationStrategy, NetworkOptimizationStrategy,
155// };
156
157use conflict_resolution::{
158    ConflictResolver, ResolutionStrategy, TimestampedOperation, VectorClock,
159};
160use consensus::ConsensusManager;
161use discovery::{DiscoveryConfig, DiscoveryService, NodeInfo};
162use distributed_query::{DistributedQueryExecutor, ResultBinding};
163use edge_computing::{EdgeComputingManager, EdgeDeploymentStrategy, EdgeDeviceProfile};
164use raft::{OxirsNodeId, RdfResponse};
165use region_manager::{
166    ConsensusStrategy as RegionConsensusStrategy, MultiRegionReplicationStrategy, Region,
167    RegionManager,
168};
169use replication::{ReplicationManager, ReplicationStats, ReplicationStrategy};
170
171/// Multi-region deployment configuration
172#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct MultiRegionConfig {
174    /// Region identifier where this node is located
175    pub region_id: String,
176    /// Availability zone identifier
177    pub availability_zone_id: String,
178    /// Data center identifier (optional)
179    pub data_center: Option<String>,
180    /// Rack identifier (optional)
181    pub rack: Option<String>,
182    /// List of all regions in the deployment
183    pub regions: Vec<Region>,
184    /// Consensus strategy for multi-region operations
185    pub consensus_strategy: RegionConsensusStrategy,
186    /// Replication strategy for multi-region
187    pub replication_strategy: MultiRegionReplicationStrategy,
188    /// Conflict resolution strategy for distributed operations
189    pub conflict_resolution_strategy: ResolutionStrategy,
190    /// Edge computing configuration
191    pub edge_config: Option<EdgeComputingConfig>,
192    /// Enable advanced monitoring and metrics
193    pub enable_monitoring: bool,
194}
195
196/// Edge computing configuration
197#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct EdgeComputingConfig {
199    /// Enable edge computing features
200    pub enabled: bool,
201    /// Local edge device profile
202    pub device_profile: EdgeDeviceProfile,
203    /// Edge deployment strategy
204    pub deployment_strategy: EdgeDeploymentStrategy,
205    /// Enable intelligent caching
206    pub enable_intelligent_caching: bool,
207    /// Enable network condition monitoring
208    pub enable_network_monitoring: bool,
209}
210
211/// Cluster node configuration
212#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct NodeConfig {
214    /// Unique node identifier
215    pub node_id: OxirsNodeId,
216    /// Network address for communication
217    pub address: SocketAddr,
218    /// Data directory for persistent storage
219    pub data_dir: String,
220    /// List of peer node IDs
221    pub peers: Vec<OxirsNodeId>,
222    /// Discovery configuration
223    pub discovery: Option<DiscoveryConfig>,
224    /// Replication strategy
225    pub replication_strategy: Option<ReplicationStrategy>,
226    /// Use Byzantine fault tolerance instead of Raft
227    #[cfg(feature = "bft")]
228    pub use_bft: bool,
229    /// Multi-region deployment configuration
230    pub region_config: Option<MultiRegionConfig>,
231}
232
233impl NodeConfig {
234    /// Create a new node configuration
235    pub fn new(node_id: OxirsNodeId, address: SocketAddr) -> Self {
236        Self {
237            node_id,
238            address,
239            data_dir: format!("./data/node-{node_id}"),
240            peers: Vec::new(),
241            discovery: Some(DiscoveryConfig::default()),
242            replication_strategy: Some(ReplicationStrategy::default()),
243            #[cfg(feature = "bft")]
244            use_bft: false,
245            region_config: None,
246        }
247    }
248
249    /// Add a peer to the configuration
250    pub fn add_peer(&mut self, peer_id: OxirsNodeId) -> &mut Self {
251        if !self.peers.contains(&peer_id) && peer_id != self.node_id {
252            self.peers.push(peer_id);
253        }
254        self
255    }
256
257    /// Set the discovery configuration
258    pub fn with_discovery(mut self, discovery: DiscoveryConfig) -> Self {
259        self.discovery = Some(discovery);
260        self
261    }
262
263    /// Set the replication strategy
264    pub fn with_replication_strategy(mut self, strategy: ReplicationStrategy) -> Self {
265        self.replication_strategy = Some(strategy);
266        self
267    }
268
269    /// Enable Byzantine fault tolerance
270    #[cfg(feature = "bft")]
271    pub fn with_bft(mut self, enable: bool) -> Self {
272        self.use_bft = enable;
273        self
274    }
275
276    /// Set multi-region configuration
277    pub fn with_multi_region(mut self, region_config: MultiRegionConfig) -> Self {
278        self.region_config = Some(region_config);
279        self
280    }
281
282    /// Check if multi-region is enabled
283    pub fn is_multi_region_enabled(&self) -> bool {
284        self.region_config.is_some()
285    }
286
287    /// Get region ID if configured
288    pub fn region_id(&self) -> Option<&str> {
289        self.region_config
290            .as_ref()
291            .map(|config| config.region_id.as_str())
292    }
293
294    /// Get availability zone ID if configured
295    pub fn availability_zone_id(&self) -> Option<&str> {
296        self.region_config
297            .as_ref()
298            .map(|config| config.availability_zone_id.as_str())
299    }
300}
301
302/// Cluster node implementation
303pub struct ClusterNode {
304    config: NodeConfig,
305    consensus: ConsensusManager,
306    discovery: DiscoveryService,
307    replication: ReplicationManager,
308    query_executor: DistributedQueryExecutor,
309    region_manager: Option<Arc<RegionManager>>,
310    conflict_resolver: Arc<ConflictResolver>,
311    #[allow(dead_code)]
312    edge_manager: Option<Arc<EdgeComputingManager>>,
313    local_vector_clock: Arc<RwLock<VectorClock>>,
314    running: Arc<RwLock<bool>>,
315    byzantine_mode: Arc<RwLock<bool>>,
316    network_isolated: Arc<RwLock<bool>>,
317}
318
319impl ClusterNode {
320    /// Create a new cluster node
321    pub async fn new(config: NodeConfig) -> Result<Self> {
322        // Validate configuration
323        if config.data_dir.is_empty() {
324            return Err(ClusterError::Config(
325                "Data directory cannot be empty".to_string(),
326            ));
327        }
328
329        // Create data directory if it doesn't exist
330        tokio::fs::create_dir_all(&config.data_dir)
331            .await
332            .map_err(|e| ClusterError::Other(format!("Failed to create data directory: {e}")))?;
333
334        // Initialize consensus manager
335        let consensus = ConsensusManager::new(config.node_id, config.peers.clone());
336
337        // Initialize discovery service
338        let discovery_config = config.discovery.clone().unwrap_or_default();
339        let discovery = DiscoveryService::new(config.node_id, config.address, discovery_config);
340
341        // Initialize replication manager
342        let replication_strategy = config.replication_strategy.clone().unwrap_or_default();
343        let replication = ReplicationManager::new(replication_strategy, config.node_id);
344
345        // Initialize distributed query executor
346        let query_executor = DistributedQueryExecutor::new(config.node_id);
347
348        // Initialize conflict resolver
349        let default_resolution_strategy = if let Some(region_config) = &config.region_config {
350            region_config.conflict_resolution_strategy.clone()
351        } else {
352            ResolutionStrategy::LastWriterWins
353        };
354        let conflict_resolver = Arc::new(ConflictResolver::new(default_resolution_strategy));
355
356        // Initialize vector clock
357        let mut vector_clock = VectorClock::new();
358        vector_clock.increment(config.node_id);
359        let local_vector_clock = Arc::new(RwLock::new(vector_clock));
360
361        // Initialize region manager if multi-region is configured
362        let region_manager = if let Some(region_config) = &config.region_config {
363            let manager = Arc::new(RegionManager::new(
364                region_config.region_id.clone(),
365                region_config.availability_zone_id.clone(),
366                region_config.consensus_strategy.clone(),
367                region_config.replication_strategy.clone(),
368            ));
369
370            // Initialize with region topology
371            manager
372                .initialize(region_config.regions.clone())
373                .await
374                .map_err(|e| {
375                    ClusterError::Other(format!("Failed to initialize region manager: {e}"))
376                })?;
377
378            // Register this node in the region manager
379            manager
380                .register_node(
381                    config.node_id,
382                    region_config.region_id.clone(),
383                    region_config.availability_zone_id.clone(),
384                    region_config.data_center.clone(),
385                    region_config.rack.clone(),
386                )
387                .await
388                .map_err(|e| {
389                    ClusterError::Other(format!("Failed to register node in region manager: {e}"))
390                })?;
391
392            Some(manager)
393        } else {
394            None
395        };
396
397        // Initialize edge computing manager if configured
398        let edge_manager = if let Some(region_config) = &config.region_config {
399            if let Some(edge_config) = &region_config.edge_config {
400                if edge_config.enabled {
401                    let manager = Arc::new(EdgeComputingManager::new());
402
403                    // Register this device with the edge manager
404                    manager
405                        .register_device(edge_config.device_profile.clone())
406                        .await
407                        .map_err(|e| {
408                            ClusterError::Other(format!("Failed to register edge device: {e}"))
409                        })?;
410
411                    Some(manager)
412                } else {
413                    None
414                }
415            } else {
416                None
417            }
418        } else {
419            None
420        };
421
422        Ok(Self {
423            config,
424            consensus,
425            discovery,
426            replication,
427            query_executor,
428            region_manager,
429            conflict_resolver,
430            edge_manager,
431            local_vector_clock,
432            running: Arc::new(RwLock::new(false)),
433            byzantine_mode: Arc::new(RwLock::new(false)),
434            network_isolated: Arc::new(RwLock::new(false)),
435        })
436    }
437
438    /// Start the cluster node
439    pub async fn start(&mut self) -> Result<()> {
440        {
441            let mut running = self.running.write().await;
442            if *running {
443                return Ok(());
444            }
445            *running = true;
446        }
447
448        tracing::info!(
449            "Starting cluster node {} at {} with {} peers",
450            self.config.node_id,
451            self.config.address,
452            self.config.peers.len()
453        );
454
455        // Start discovery service
456        self.discovery
457            .start()
458            .await
459            .map_err(|e| ClusterError::Other(format!("Failed to start discovery service: {e}")))?;
460
461        // Discover initial nodes
462        let discovered_nodes = self
463            .discovery
464            .discover_nodes()
465            .await
466            .map_err(|e| ClusterError::Other(format!("Failed to discover nodes: {e}")))?;
467
468        // Add discovered nodes to replication manager and query executor
469        for node in discovered_nodes {
470            if node.node_id != self.config.node_id {
471                self.replication
472                    .add_replica(node.node_id, node.address.to_string());
473                self.query_executor.add_node(node.node_id).await;
474            }
475        }
476
477        // Initialize consensus system
478        self.consensus
479            .init()
480            .await
481            .map_err(|e| ClusterError::Other(format!("Failed to initialize consensus: {e}")))?;
482
483        tracing::info!("Cluster node {} started successfully", self.config.node_id);
484
485        // Start background tasks
486        self.start_background_tasks().await;
487
488        Ok(())
489    }
490
491    /// Stop the cluster node
492    pub async fn stop(&mut self) -> Result<()> {
493        let mut running = self.running.write().await;
494        if !*running {
495            return Ok(());
496        }
497
498        tracing::info!("Stopping cluster node {}", self.config.node_id);
499
500        // Stop discovery service
501        self.discovery
502            .stop()
503            .await
504            .map_err(|e| ClusterError::Other(format!("Failed to stop discovery service: {e}")))?;
505
506        *running = false;
507
508        tracing::info!("Cluster node {} stopped", self.config.node_id);
509
510        Ok(())
511    }
512
513    /// Check if this node is the leader
514    pub async fn is_leader(&self) -> bool {
515        self.consensus.is_leader().await
516    }
517
518    /// Get current consensus term
519    pub async fn current_term(&self) -> u64 {
520        self.consensus.current_term().await
521    }
522
523    /// Insert a triple through distributed consensus
524    pub async fn insert_triple(
525        &self,
526        subject: &str,
527        predicate: &str,
528        object: &str,
529    ) -> Result<RdfResponse> {
530        if !self.is_leader().await {
531            return Err(ClusterError::NotLeader);
532        }
533
534        let response = self
535            .consensus
536            .insert_triple(
537                subject.to_string(),
538                predicate.to_string(),
539                object.to_string(),
540            )
541            .await?;
542
543        Ok(response)
544    }
545
546    /// Delete a triple through distributed consensus
547    pub async fn delete_triple(
548        &self,
549        subject: &str,
550        predicate: &str,
551        object: &str,
552    ) -> Result<RdfResponse> {
553        if !self.is_leader().await {
554            return Err(ClusterError::NotLeader);
555        }
556
557        let response = self
558            .consensus
559            .delete_triple(
560                subject.to_string(),
561                predicate.to_string(),
562                object.to_string(),
563            )
564            .await?;
565
566        Ok(response)
567    }
568
569    /// Clear all triples through distributed consensus
570    pub async fn clear_store(&self) -> Result<RdfResponse> {
571        if !self.is_leader().await {
572            return Err(ClusterError::NotLeader);
573        }
574
575        let response = self.consensus.clear_store().await?;
576        Ok(response)
577    }
578
579    /// Begin a distributed transaction
580    pub async fn begin_transaction(&self) -> Result<String> {
581        if !self.is_leader().await {
582            return Err(ClusterError::NotLeader);
583        }
584
585        let tx_id = uuid::Uuid::new_v4().to_string();
586        let _response = self.consensus.begin_transaction(tx_id.clone()).await?;
587
588        Ok(tx_id)
589    }
590
591    /// Commit a distributed transaction
592    pub async fn commit_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
593        if !self.is_leader().await {
594            return Err(ClusterError::NotLeader);
595        }
596
597        let response = self.consensus.commit_transaction(tx_id.to_string()).await?;
598        Ok(response)
599    }
600
601    /// Rollback a distributed transaction
602    pub async fn rollback_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
603        if !self.is_leader().await {
604            return Err(ClusterError::NotLeader);
605        }
606
607        let response = self
608            .consensus
609            .rollback_transaction(tx_id.to_string())
610            .await?;
611        Ok(response)
612    }
613
614    /// Query triples (can be done on any node)
615    pub async fn query_triples(
616        &self,
617        subject: Option<&str>,
618        predicate: Option<&str>,
619        object: Option<&str>,
620    ) -> Vec<(String, String, String)> {
621        self.consensus.query(subject, predicate, object).await
622    }
623
624    /// Execute SPARQL query using distributed query processing
625    pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
626        let bindings = self
627            .query_executor
628            .execute_query(sparql)
629            .await
630            .map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))?;
631
632        // Convert result bindings to string format
633        let results = bindings
634            .into_iter()
635            .map(|binding| {
636                let vars: Vec<String> = binding
637                    .variables
638                    .into_iter()
639                    .map(|(var, val)| format!("{var}: {val}"))
640                    .collect();
641                vars.join(", ")
642            })
643            .collect();
644
645        Ok(results)
646    }
647
648    /// Execute SPARQL query and return structured results
649    pub async fn query_sparql_bindings(&self, sparql: &str) -> Result<Vec<ResultBinding>> {
650        self.query_executor
651            .execute_query(sparql)
652            .await
653            .map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))
654    }
655
656    /// Get query execution statistics
657    pub async fn get_query_statistics(
658        &self,
659    ) -> Result<std::collections::HashMap<String, distributed_query::QueryStats>> {
660        Ok(self.query_executor.get_statistics().await)
661    }
662
663    /// Clear query cache
664    pub async fn clear_query_cache(&self) -> Result<()> {
665        self.query_executor.clear_cache().await;
666        Ok(())
667    }
668
669    /// Get the number of triples in the store
670    pub async fn len(&self) -> usize {
671        self.consensus.len().await
672    }
673
674    /// Check if the store is empty
675    pub async fn is_empty(&self) -> bool {
676        self.consensus.is_empty().await
677    }
678
679    /// Add a new node to the cluster
680    pub async fn add_cluster_node(
681        &mut self,
682        node_id: OxirsNodeId,
683        address: SocketAddr,
684    ) -> Result<()> {
685        if node_id == self.config.node_id {
686            return Err(ClusterError::Config(
687                "Cannot add self to cluster".to_string(),
688            ));
689        }
690
691        // Add to configuration
692        self.config.add_peer(node_id);
693
694        // Add to discovery
695        let node_info = NodeInfo::new(node_id, address);
696        self.discovery.add_node(node_info);
697
698        // Add to replication
699        self.replication.add_replica(node_id, address.to_string());
700
701        // Add to query executor
702        self.query_executor.add_node(node_id).await;
703
704        // Add to consensus (this would trigger Raft membership change)
705        self.consensus.add_peer(node_id);
706
707        tracing::info!("Added node {} at {} to cluster", node_id, address);
708
709        Ok(())
710    }
711
712    /// Remove a node from the cluster
713    pub async fn remove_cluster_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
714        if node_id == self.config.node_id {
715            return Err(ClusterError::Config(
716                "Cannot remove self from cluster".to_string(),
717            ));
718        }
719
720        // Remove from configuration
721        self.config.peers.retain(|&id| id != node_id);
722
723        // Remove from discovery
724        self.discovery.remove_node(node_id);
725
726        // Remove from replication
727        self.replication.remove_replica(node_id);
728
729        // Remove from query executor
730        self.query_executor.remove_node(node_id).await;
731
732        // Remove from consensus (this would trigger Raft membership change)
733        self.consensus.remove_peer(node_id);
734
735        tracing::info!("Removed node {} from cluster", node_id);
736
737        Ok(())
738    }
739
740    /// Get comprehensive cluster status
741    pub async fn get_status(&self) -> ClusterStatus {
742        let consensus_status = self.consensus.get_status().await;
743        let discovery_stats = self.discovery.get_stats().clone();
744        let replication_stats = self.replication.get_stats().clone();
745
746        // Get region status if multi-region is enabled
747        let region_status = if let Some(region_manager) = &self.region_manager {
748            let region_id = region_manager.get_local_region().to_string();
749            let availability_zone_id = region_manager.get_local_availability_zone().to_string();
750            let regional_peers = region_manager.get_nodes_in_region(&region_id).await;
751            let topology = region_manager.get_topology().await;
752
753            Some(RegionStatus {
754                region_id,
755                availability_zone_id,
756                regional_peer_count: regional_peers.len(),
757                total_regions: topology.regions.len(),
758                monitoring_active: true, // TODO: Check actual monitoring status
759            })
760        } else {
761            None
762        };
763
764        ClusterStatus {
765            node_id: self.config.node_id,
766            address: self.config.address,
767            is_leader: consensus_status.is_leader,
768            current_term: consensus_status.current_term,
769            peer_count: consensus_status.peer_count,
770            triple_count: consensus_status.triple_count,
771            discovery_stats,
772            replication_stats,
773            is_running: *self.running.read().await,
774            region_status,
775        }
776    }
777
778    /// Start background maintenance tasks
779    async fn start_background_tasks(&mut self) {
780        let running = Arc::clone(&self.running);
781
782        // Discovery and health check task
783        let discovery_config = self.config.discovery.clone().unwrap_or_default();
784        let mut discovery_clone =
785            DiscoveryService::new(self.config.node_id, self.config.address, discovery_config);
786
787        tokio::spawn(async move {
788            while *running.read().await {
789                discovery_clone.run_periodic_tasks().await;
790                tokio::time::sleep(std::time::Duration::from_secs(1)).await;
791            }
792        });
793
794        // Replication maintenance task
795        let mut replication_clone = ReplicationManager::with_raft_consensus(self.config.node_id);
796        let running_clone = Arc::clone(&self.running);
797
798        tokio::spawn(async move {
799            if *running_clone.read().await {
800                replication_clone.run_maintenance().await; // run_maintenance() is infinite loop
801            }
802        });
803    }
804
805    /// Add a new node to the cluster using consensus protocol
806    pub async fn add_node_with_consensus(
807        &mut self,
808        node_id: OxirsNodeId,
809        address: SocketAddr,
810    ) -> Result<()> {
811        self.consensus
812            .add_node_with_consensus(node_id, address.to_string())
813            .await
814            .map_err(|e| {
815                ClusterError::Other(format!("Failed to add node through consensus: {e}"))
816            })?;
817
818        // Update local configuration
819        self.config.add_peer(node_id);
820
821        // Add to discovery, replication, and query executor
822        let node_info = NodeInfo::new(node_id, address);
823        self.discovery.add_node(node_info);
824        self.replication.add_replica(node_id, address.to_string());
825        self.query_executor.add_node(node_id).await;
826
827        Ok(())
828    }
829
830    /// Remove a node from the cluster using consensus protocol
831    pub async fn remove_node_with_consensus(&mut self, node_id: OxirsNodeId) -> Result<()> {
832        self.consensus
833            .remove_node_with_consensus(node_id)
834            .await
835            .map_err(|e| {
836                ClusterError::Other(format!("Failed to remove node through consensus: {e}"))
837            })?;
838
839        // Update local configuration
840        self.config.peers.retain(|&id| id != node_id);
841
842        // Remove from discovery, replication, and query executor
843        self.discovery.remove_node(node_id);
844        self.replication.remove_replica(node_id);
845        self.query_executor.remove_node(node_id).await;
846
847        Ok(())
848    }
849
850    /// Gracefully shutdown this node
851    pub async fn graceful_shutdown(&mut self) -> Result<()> {
852        tracing::info!(
853            "Initiating graceful shutdown of cluster node {}",
854            self.config.node_id
855        );
856
857        // Stop background tasks first
858        {
859            let mut running = self.running.write().await;
860            *running = false;
861        }
862
863        // Gracefully shutdown consensus layer (includes leadership transfer if needed)
864        self.consensus
865            .graceful_shutdown()
866            .await
867            .map_err(|e| ClusterError::Other(format!("Failed to shutdown consensus: {e}")))?;
868
869        // Stop discovery and replication services
870        self.discovery
871            .stop()
872            .await
873            .map_err(|e| ClusterError::Other(format!("Failed to stop discovery: {e}")))?;
874
875        tracing::info!("Cluster node {} gracefully shutdown", self.config.node_id);
876        Ok(())
877    }
878
879    /// Transfer leadership to another node
880    pub async fn transfer_leadership(&mut self, target_node: OxirsNodeId) -> Result<()> {
881        if !self.config.peers.contains(&target_node) {
882            return Err(ClusterError::Config(format!(
883                "Target node {target_node} not in cluster"
884            )));
885        }
886
887        self.consensus
888            .transfer_leadership(target_node)
889            .await
890            .map_err(|e| ClusterError::Other(format!("Failed to transfer leadership: {e}")))?;
891
892        Ok(())
893    }
894
895    /// Force evict a non-responsive node
896    pub async fn force_evict_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
897        self.consensus
898            .force_evict_node(node_id)
899            .await
900            .map_err(|e| ClusterError::Other(format!("Failed to force evict node: {e}")))?;
901
902        // Update local configuration
903        self.config.peers.retain(|&id| id != node_id);
904        self.discovery.remove_node(node_id);
905        self.replication.remove_replica(node_id);
906        self.query_executor.remove_node(node_id).await;
907
908        Ok(())
909    }
910
911    /// Check health of all peer nodes
912    pub async fn check_cluster_health(&self) -> Result<Vec<consensus::NodeHealthStatus>> {
913        self.consensus
914            .check_peer_health()
915            .await
916            .map_err(|e| ClusterError::Other(format!("Failed to check cluster health: {e}")))
917    }
918
919    /// Attempt recovery from partition or failure
920    pub async fn attempt_recovery(&mut self) -> Result<()> {
921        self.consensus
922            .attempt_recovery()
923            .await
924            .map_err(|e| ClusterError::Other(format!("Failed to recover cluster: {e}")))?;
925
926        tracing::info!(
927            "Cluster recovery completed for node {}",
928            self.config.node_id
929        );
930        Ok(())
931    }
932
933    /// Get the node ID
934    pub fn id(&self) -> OxirsNodeId {
935        self.config.node_id
936    }
937
938    /// Count triples in the store
939    pub async fn count_triples(&self) -> Result<usize> {
940        Ok(self.len().await)
941    }
942
943    /// Check if the node is active (running and not isolated)
944    pub async fn is_active(&self) -> Result<bool> {
945        Ok(*self.running.read().await && !*self.network_isolated.read().await)
946    }
947
948    /// Isolate the node from network (simulate network partition)
949    pub async fn isolate_network(&self) -> Result<()> {
950        let mut isolated = self.network_isolated.write().await;
951        *isolated = true;
952        tracing::info!("Node {} network isolated", self.config.node_id);
953        Ok(())
954    }
955
956    /// Restore network connectivity
957    pub async fn restore_network(&self) -> Result<()> {
958        let mut isolated = self.network_isolated.write().await;
959        *isolated = false;
960        tracing::info!("Node {} network restored", self.config.node_id);
961        Ok(())
962    }
963
964    /// Enable Byzantine behavior (for testing)
965    pub async fn enable_byzantine_mode(&self) -> Result<()> {
966        let mut byzantine = self.byzantine_mode.write().await;
967        *byzantine = true;
968        tracing::info!("Node {} Byzantine mode enabled", self.config.node_id);
969        Ok(())
970    }
971
972    /// Check if node is in Byzantine mode
973    pub async fn is_byzantine(&self) -> Result<bool> {
974        Ok(*self.byzantine_mode.read().await)
975    }
976
977    /// Get multi-region manager (if configured)
978    pub fn region_manager(&self) -> Option<&Arc<RegionManager>> {
979        self.region_manager.as_ref()
980    }
981
982    /// Check if multi-region deployment is enabled
983    pub fn is_multi_region_enabled(&self) -> bool {
984        self.region_manager.is_some()
985    }
986
987    /// Get current node's region ID
988    pub fn get_region_id(&self) -> Option<String> {
989        self.region_manager
990            .as_ref()
991            .map(|rm| rm.get_local_region().to_string())
992    }
993
994    /// Get current node's availability zone ID
995    pub fn get_availability_zone_id(&self) -> Option<String> {
996        self.region_manager
997            .as_ref()
998            .map(|rm| rm.get_local_availability_zone().to_string())
999    }
1000
1001    /// Get nodes in the same region
1002    pub async fn get_regional_peers(&self) -> Result<Vec<OxirsNodeId>> {
1003        if let Some(region_manager) = &self.region_manager {
1004            let region_id = region_manager.get_local_region();
1005            Ok(region_manager.get_nodes_in_region(region_id).await)
1006        } else {
1007            Err(ClusterError::Config(
1008                "Multi-region not configured".to_string(),
1009            ))
1010        }
1011    }
1012
1013    /// Get optimal leader candidates considering region affinity
1014    pub async fn get_regional_leader_candidates(&self) -> Result<Vec<OxirsNodeId>> {
1015        if let Some(region_manager) = &self.region_manager {
1016            let region_id = region_manager.get_local_region();
1017            Ok(region_manager.get_leader_candidates(region_id).await)
1018        } else {
1019            // Fall back to regular peer list
1020            Ok(self.config.peers.clone())
1021        }
1022    }
1023
1024    /// Calculate cross-region replication targets
1025    pub async fn get_cross_region_replication_targets(&self) -> Result<Vec<String>> {
1026        if let Some(region_manager) = &self.region_manager {
1027            let region_id = region_manager.get_local_region();
1028            region_manager
1029                .calculate_replication_targets(region_id)
1030                .await
1031                .map_err(|e| {
1032                    ClusterError::Other(format!("Failed to calculate replication targets: {e}"))
1033                })
1034        } else {
1035            Ok(Vec::new())
1036        }
1037    }
1038
1039    /// Monitor inter-region latencies and update metrics
1040    pub async fn monitor_region_latencies(&self) -> Result<()> {
1041        if let Some(region_manager) = &self.region_manager {
1042            region_manager.monitor_latencies().await.map_err(|e| {
1043                ClusterError::Other(format!("Failed to monitor region latencies: {e}"))
1044            })
1045        } else {
1046            Ok(())
1047        }
1048    }
1049
1050    /// Get region health status
1051    pub async fn get_region_health(&self, region_id: &str) -> Result<region_manager::RegionHealth> {
1052        if let Some(region_manager) = &self.region_manager {
1053            region_manager
1054                .get_region_health(region_id)
1055                .await
1056                .map_err(|e| ClusterError::Other(format!("Failed to get region health: {e}")))
1057        } else {
1058            Err(ClusterError::Config(
1059                "Multi-region not configured".to_string(),
1060            ))
1061        }
1062    }
1063
1064    /// Perform region failover operation
1065    pub async fn perform_region_failover(
1066        &self,
1067        failed_region: &str,
1068        target_region: &str,
1069    ) -> Result<()> {
1070        if let Some(region_manager) = &self.region_manager {
1071            region_manager
1072                .perform_region_failover(failed_region, target_region)
1073                .await
1074                .map_err(|e| ClusterError::Other(format!("Failed to perform region failover: {e}")))
1075        } else {
1076            Err(ClusterError::Config(
1077                "Multi-region not configured".to_string(),
1078            ))
1079        }
1080    }
1081
1082    /// Get multi-region topology information
1083    pub async fn get_region_topology(&self) -> Result<region_manager::RegionTopology> {
1084        if let Some(region_manager) = &self.region_manager {
1085            Ok(region_manager.get_topology().await)
1086        } else {
1087            Err(ClusterError::Config(
1088                "Multi-region not configured".to_string(),
1089            ))
1090        }
1091    }
1092
1093    /// Add a node to a specific region and availability zone
1094    pub async fn add_node_to_region(
1095        &self,
1096        node_id: OxirsNodeId,
1097        region_id: String,
1098        availability_zone_id: String,
1099        data_center: Option<String>,
1100        rack: Option<String>,
1101    ) -> Result<()> {
1102        if let Some(region_manager) = &self.region_manager {
1103            region_manager
1104                .register_node(node_id, region_id, availability_zone_id, data_center, rack)
1105                .await
1106                .map_err(|e| ClusterError::Other(format!("Failed to add node to region: {e}")))
1107        } else {
1108            Err(ClusterError::Config(
1109                "Multi-region not configured".to_string(),
1110            ))
1111        }
1112    }
1113
1114    /// Get conflict resolver instance
1115    pub fn conflict_resolver(&self) -> &Arc<ConflictResolver> {
1116        &self.conflict_resolver
1117    }
1118
1119    /// Get current vector clock value
1120    pub async fn get_vector_clock(&self) -> VectorClock {
1121        self.local_vector_clock.read().await.clone()
1122    }
1123
1124    /// Update vector clock with received clock
1125    pub async fn update_vector_clock(&self, received_clock: &VectorClock) {
1126        let mut clock = self.local_vector_clock.write().await;
1127        clock.update(received_clock);
1128        clock.increment(self.config.node_id);
1129    }
1130
1131    /// Create a timestamped operation with current vector clock
1132    pub async fn create_timestamped_operation(
1133        &self,
1134        operation: conflict_resolution::RdfOperation,
1135        priority: u32,
1136    ) -> TimestampedOperation {
1137        let mut clock = self.local_vector_clock.write().await;
1138        clock.increment(self.config.node_id);
1139
1140        TimestampedOperation {
1141            operation_id: uuid::Uuid::new_v4().to_string(),
1142            origin_node: self.config.node_id,
1143            vector_clock: clock.clone(),
1144            physical_time: std::time::SystemTime::now(),
1145            operation,
1146            priority,
1147        }
1148    }
1149
1150    /// Detect conflicts in a batch of operations
1151    pub async fn detect_operation_conflicts(
1152        &self,
1153        operations: &[TimestampedOperation],
1154    ) -> Result<Vec<conflict_resolution::ConflictType>> {
1155        self.conflict_resolver
1156            .detect_conflicts(operations)
1157            .await
1158            .map_err(|e| ClusterError::Other(format!("Failed to detect conflicts: {e}")))
1159    }
1160
1161    /// Resolve conflicts using configured strategies
1162    pub async fn resolve_operation_conflicts(
1163        &self,
1164        conflicts: &[conflict_resolution::ConflictType],
1165    ) -> Result<Vec<conflict_resolution::ResolutionResult>> {
1166        self.conflict_resolver
1167            .resolve_conflicts(conflicts)
1168            .await
1169            .map_err(|e| ClusterError::Other(format!("Failed to resolve conflicts: {e}")))
1170    }
1171
1172    /// Submit an operation for conflict-aware processing
1173    pub async fn submit_conflict_aware_operation(
1174        &self,
1175        operation: conflict_resolution::RdfOperation,
1176        priority: u32,
1177    ) -> Result<RdfResponse> {
1178        // Create timestamped operation
1179        let _timestamped_op = self
1180            .create_timestamped_operation(operation.clone(), priority)
1181            .await;
1182
1183        // For now, submit to consensus without conflict detection
1184        // In a full implementation, this would be integrated with the consensus layer
1185        match operation {
1186            conflict_resolution::RdfOperation::Insert {
1187                subject,
1188                predicate,
1189                object,
1190                ..
1191            } => self.insert_triple(&subject, &predicate, &object).await,
1192            conflict_resolution::RdfOperation::Delete {
1193                subject,
1194                predicate,
1195                object,
1196                ..
1197            } => self.delete_triple(&subject, &predicate, &object).await,
1198            conflict_resolution::RdfOperation::Clear { .. } => self.clear_store().await,
1199            conflict_resolution::RdfOperation::Update {
1200                old_triple,
1201                new_triple,
1202                ..
1203            } => {
1204                // Implement as delete + insert
1205                let _delete_result = self
1206                    .delete_triple(&old_triple.0, &old_triple.1, &old_triple.2)
1207                    .await?;
1208                self.insert_triple(&new_triple.0, &new_triple.1, &new_triple.2)
1209                    .await
1210            }
1211            conflict_resolution::RdfOperation::Batch { operations: _ } => {
1212                // Process batch operations sequentially
1213                // Note: This is a simplified implementation that doesn't use recursion
1214                // In a full implementation, each operation would be processed individually
1215                // For now, just return success for batch operations
1216                Ok(RdfResponse::Success)
1217            }
1218        }
1219    }
1220
1221    /// Get conflict resolution statistics
1222    pub async fn get_conflict_resolution_statistics(
1223        &self,
1224    ) -> conflict_resolution::ResolutionStatistics {
1225        self.conflict_resolver.get_statistics().await
1226    }
1227}
1228
1229/// Comprehensive cluster status information
1230#[derive(Debug, Clone)]
1231pub struct ClusterStatus {
1232    /// Local node ID
1233    pub node_id: OxirsNodeId,
1234    /// Local node address
1235    pub address: SocketAddr,
1236    /// Whether this node is the current leader
1237    pub is_leader: bool,
1238    /// Current Raft term
1239    pub current_term: u64,
1240    /// Number of peer nodes
1241    pub peer_count: usize,
1242    /// Number of triples in the store
1243    pub triple_count: usize,
1244    /// Discovery service statistics
1245    pub discovery_stats: discovery::DiscoveryStats,
1246    /// Replication statistics
1247    pub replication_stats: ReplicationStats,
1248    /// Whether the node is currently running
1249    pub is_running: bool,
1250    /// Multi-region status (if enabled)
1251    pub region_status: Option<RegionStatus>,
1252}
1253
1254/// Multi-region status information
1255#[derive(Debug, Clone)]
1256pub struct RegionStatus {
1257    /// Current region ID
1258    pub region_id: String,
1259    /// Current availability zone ID
1260    pub availability_zone_id: String,
1261    /// Number of nodes in the same region
1262    pub regional_peer_count: usize,
1263    /// Total number of regions in topology
1264    pub total_regions: usize,
1265    /// Whether multi-region monitoring is active
1266    pub monitoring_active: bool,
1267}
1268
1269/// Distributed RDF store (simplified interface)
1270pub struct DistributedStore {
1271    node: ClusterNode,
1272}
1273
1274impl DistributedStore {
1275    /// Create a new distributed store
1276    pub async fn new(config: NodeConfig) -> Result<Self> {
1277        let node = ClusterNode::new(config).await?;
1278        Ok(Self { node })
1279    }
1280
1281    /// Start the distributed store
1282    pub async fn start(&mut self) -> Result<()> {
1283        self.node.start().await
1284    }
1285
1286    /// Stop the distributed store
1287    pub async fn stop(&mut self) -> Result<()> {
1288        self.node.stop().await
1289    }
1290
1291    /// Insert a triple (only on leader)
1292    pub async fn insert_triple(
1293        &mut self,
1294        subject: &str,
1295        predicate: &str,
1296        object: &str,
1297    ) -> Result<()> {
1298        let _response = self.node.insert_triple(subject, predicate, object).await?;
1299        Ok(())
1300    }
1301
1302    /// Query triples using SPARQL
1303    pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
1304        self.node.query_sparql(sparql).await
1305    }
1306
1307    /// Query triples by pattern
1308    pub async fn query_pattern(
1309        &self,
1310        subject: Option<&str>,
1311        predicate: Option<&str>,
1312        object: Option<&str>,
1313    ) -> Vec<(String, String, String)> {
1314        self.node.query_triples(subject, predicate, object).await
1315    }
1316
1317    /// Get cluster status
1318    pub async fn get_status(&self) -> ClusterStatus {
1319        self.node.get_status().await
1320    }
1321}
1322
1323/// Re-export commonly used types
1324pub use consensus::ConsensusError;
1325pub use discovery::DiscoveryError;
1326pub use replication::ReplicationError;
1327
1328#[cfg(test)]
1329mod tests {
1330    use super::*;
1331    use std::net::{IpAddr, Ipv4Addr};
1332
1333    #[tokio::test]
1334    async fn test_node_config_creation() {
1335        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1336        let config = NodeConfig::new(1, addr);
1337
1338        assert_eq!(config.node_id, 1);
1339        assert_eq!(config.address, addr);
1340        assert_eq!(config.data_dir, "./data/node-1");
1341        assert!(config.peers.is_empty());
1342        assert!(config.discovery.is_some());
1343        assert!(config.replication_strategy.is_some());
1344        assert!(config.region_config.is_none());
1345    }
1346
1347    #[tokio::test]
1348    async fn test_node_config_add_peer() {
1349        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1350        let mut config = NodeConfig::new(1, addr);
1351
1352        config.add_peer(2);
1353        config.add_peer(3);
1354        config.add_peer(2); // Duplicate should be ignored
1355
1356        assert_eq!(config.peers, vec![2, 3]);
1357    }
1358
1359    #[tokio::test]
1360    async fn test_node_config_no_self_peer() {
1361        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1362        let mut config = NodeConfig::new(1, addr);
1363
1364        config.add_peer(1); // Should not add self
1365
1366        assert!(config.peers.is_empty());
1367    }
1368
1369    #[tokio::test]
1370    async fn test_cluster_node_creation() {
1371        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1372        let config = NodeConfig::new(1, addr);
1373
1374        let node = ClusterNode::new(config).await;
1375        assert!(node.is_ok());
1376
1377        let node = node.unwrap();
1378        assert_eq!(node.config.node_id, 1);
1379        assert_eq!(node.config.address, addr);
1380    }
1381
1382    #[tokio::test]
1383    async fn test_cluster_node_empty_data_dir_error() {
1384        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1385        let mut config = NodeConfig::new(1, addr);
1386        config.data_dir = String::new();
1387
1388        let result = ClusterNode::new(config).await;
1389        assert!(result.is_err());
1390        if let Err(e) = result {
1391            assert!(e.to_string().contains("Data directory cannot be empty"));
1392        }
1393    }
1394
1395    #[tokio::test]
1396    async fn test_distributed_store_creation() {
1397        let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1398        let config = NodeConfig::new(1, addr);
1399
1400        let store = DistributedStore::new(config).await;
1401        assert!(store.is_ok());
1402    }
1403
1404    #[test]
1405    fn test_cluster_error_types() {
1406        let err = ClusterError::Config("test error".to_string());
1407        assert!(err.to_string().contains("Configuration error: test error"));
1408
1409        let err = ClusterError::NotLeader;
1410        assert_eq!(err.to_string(), "Not the leader node");
1411
1412        let err = ClusterError::Network("connection failed".to_string());
1413        assert!(err.to_string().contains("Network error: connection failed"));
1414    }
1415}