#![allow(clippy::field_reassign_with_default)]
#![allow(clippy::single_match)]
#![allow(clippy::collapsible_if)]
#![allow(clippy::clone_on_copy)]
#![allow(clippy::type_complexity)]
#![allow(clippy::collapsible_match)]
#![allow(clippy::manual_clamp)]
#![allow(clippy::needless_range_loop)]
#![allow(clippy::or_fun_call)]
#![allow(clippy::if_same_then_else)]
#![allow(clippy::only_used_in_recursion)]
#![allow(clippy::new_without_default)]
#![allow(clippy::derivable_impls)]
#![allow(clippy::useless_conversion)]
use serde::{Deserialize, Serialize};
use std::net::SocketAddr;
use std::sync::Arc;
use tokio::sync::RwLock;
pub mod adaptive_leader_election;
pub mod advanced_partitioning;
pub mod advanced_storage;
pub mod alerting;
pub mod auto_scaling;
pub mod backup_restore;
pub mod circuit_breaker;
pub mod cloud_integration;
pub mod cluster_metrics;
pub mod compression_strategy;
pub mod conflict_resolution;
pub mod consensus;
pub mod crash_recovery;
pub mod data_rebalancing;
pub mod disaster_recovery;
pub mod discovery;
pub mod distributed_query;
pub mod distributed_tracing;
pub mod edge_computing;
pub mod encryption;
pub mod enhanced_node_discovery;
pub mod enhanced_snapshotting;
pub mod error;
pub mod failover;
pub mod federation;
pub mod gpu_acceleration;
pub mod health_monitor;
pub mod health_monitoring;
pub mod memory_optimization;
pub mod merkle_tree;
pub mod ml_optimization;
pub mod multi_tenant;
pub mod mvcc;
pub mod mvcc_storage;
pub mod network;
pub mod neural_architecture_search;
pub mod node_lifecycle;
pub mod node_status_tracker;
pub mod operational_transformation;
pub mod optimization;
pub mod partition_detection;
pub mod performance_metrics;
pub mod performance_monitor;
pub mod raft;
pub mod raft_optimization;
pub mod raft_profiling;
pub mod raft_state;
pub mod range_partitioning;
pub mod read_replica;
pub mod region_manager;
pub mod replication;
pub mod replication_lag_monitor;
pub mod rl_consensus_optimizer;
pub mod rolling_upgrade;
pub mod rolling_upgrade_orchestrator;
pub mod split_brain_detector;
pub mod visualization_dashboard;
pub mod zero_downtime_migration;
pub mod cross_dc;
pub mod network_compression;
pub mod security;
pub mod serialization;
pub mod shard;
pub mod shard_manager;
pub mod shard_migration;
pub mod shard_routing;
pub mod split_brain_prevention;
pub mod storage;
pub mod strong_consistency;
pub mod tls;
pub mod topology;
pub mod transaction;
pub mod transaction_optimizer;
#[cfg(feature = "bft")]
pub mod bft;
#[cfg(feature = "bft")]
pub mod bft_consensus;
#[cfg(feature = "bft")]
pub mod bft_network;
pub mod gossip_scaling;
pub mod sla_manager;
pub mod stream_integration;
pub mod adaptive_consistent_hash;
pub mod cross_dc_consistency;
pub mod distributed_tx_coordinator;
pub mod vnodes_hash_ring;
pub mod membership_gossip;
pub mod leader_election;
pub mod snapshot_manager;
pub mod consistent_shard_router;
pub mod partition_rebalancer;
pub mod node_monitor;
pub mod failover_manager;
pub mod anti_entropy;
pub mod replication_throttle;
pub mod data_migrator;
pub mod shard_router;
pub mod election_timer;
pub use error::{ClusterError, Result};
pub use failover::{FailoverConfig, FailoverManager, FailoverStrategy, RecoveryAction};
pub use health_monitor::{HealthMonitor, HealthMonitorConfig, NodeHealth, SystemMetrics};
use conflict_resolution::{
ConflictResolver, ResolutionStrategy, TimestampedOperation, VectorClock,
};
use consensus::ConsensusManager;
use discovery::{DiscoveryConfig, DiscoveryService, NodeInfo};
use distributed_query::{DistributedQueryExecutor, ResultBinding};
use edge_computing::{EdgeComputingManager, EdgeDeploymentStrategy, EdgeDeviceProfile};
use raft::{OxirsNodeId, RdfResponse};
use region_manager::{
ConsensusStrategy as RegionConsensusStrategy, MultiRegionReplicationStrategy, Region,
RegionManager,
};
use replication::{ReplicationManager, ReplicationStats, ReplicationStrategy};
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct MultiRegionConfig {
pub region_id: String,
pub availability_zone_id: String,
pub data_center: Option<String>,
pub rack: Option<String>,
pub regions: Vec<Region>,
pub consensus_strategy: RegionConsensusStrategy,
pub replication_strategy: MultiRegionReplicationStrategy,
pub conflict_resolution_strategy: ResolutionStrategy,
pub edge_config: Option<EdgeComputingConfig>,
pub enable_monitoring: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct EdgeComputingConfig {
pub enabled: bool,
pub device_profile: EdgeDeviceProfile,
pub deployment_strategy: EdgeDeploymentStrategy,
pub enable_intelligent_caching: bool,
pub enable_network_monitoring: bool,
}
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct NodeConfig {
pub node_id: OxirsNodeId,
pub address: SocketAddr,
pub data_dir: String,
pub peers: Vec<OxirsNodeId>,
pub discovery: Option<DiscoveryConfig>,
pub replication_strategy: Option<ReplicationStrategy>,
#[cfg(feature = "bft")]
pub use_bft: bool,
pub region_config: Option<MultiRegionConfig>,
}
impl NodeConfig {
pub fn new(node_id: OxirsNodeId, address: SocketAddr) -> Self {
Self {
node_id,
address,
data_dir: format!("./data/node-{node_id}"),
peers: Vec::new(),
discovery: Some(DiscoveryConfig::default()),
replication_strategy: Some(ReplicationStrategy::default()),
#[cfg(feature = "bft")]
use_bft: false,
region_config: None,
}
}
pub fn add_peer(&mut self, peer_id: OxirsNodeId) -> &mut Self {
if !self.peers.contains(&peer_id) && peer_id != self.node_id {
self.peers.push(peer_id);
}
self
}
pub fn with_discovery(mut self, discovery: DiscoveryConfig) -> Self {
self.discovery = Some(discovery);
self
}
pub fn with_replication_strategy(mut self, strategy: ReplicationStrategy) -> Self {
self.replication_strategy = Some(strategy);
self
}
#[cfg(feature = "bft")]
pub fn with_bft(mut self, enable: bool) -> Self {
self.use_bft = enable;
self
}
pub fn with_multi_region(mut self, region_config: MultiRegionConfig) -> Self {
self.region_config = Some(region_config);
self
}
pub fn is_multi_region_enabled(&self) -> bool {
self.region_config.is_some()
}
pub fn region_id(&self) -> Option<&str> {
self.region_config
.as_ref()
.map(|config| config.region_id.as_str())
}
pub fn availability_zone_id(&self) -> Option<&str> {
self.region_config
.as_ref()
.map(|config| config.availability_zone_id.as_str())
}
}
pub struct ClusterNode {
config: NodeConfig,
consensus: ConsensusManager,
discovery: DiscoveryService,
replication: ReplicationManager,
query_executor: DistributedQueryExecutor,
region_manager: Option<Arc<RegionManager>>,
conflict_resolver: Arc<ConflictResolver>,
#[allow(dead_code)]
edge_manager: Option<Arc<EdgeComputingManager>>,
local_vector_clock: Arc<RwLock<VectorClock>>,
running: Arc<RwLock<bool>>,
byzantine_mode: Arc<RwLock<bool>>,
network_isolated: Arc<RwLock<bool>>,
}
impl ClusterNode {
pub async fn new(config: NodeConfig) -> Result<Self> {
if config.data_dir.is_empty() {
return Err(ClusterError::Config(
"Data directory cannot be empty".to_string(),
));
}
tokio::fs::create_dir_all(&config.data_dir)
.await
.map_err(|e| ClusterError::Other(format!("Failed to create data directory: {e}")))?;
let consensus = ConsensusManager::new(config.node_id, config.peers.clone());
let discovery_config = config.discovery.clone().unwrap_or_default();
let discovery = DiscoveryService::new(config.node_id, config.address, discovery_config);
let replication_strategy = config.replication_strategy.clone().unwrap_or_default();
let replication = ReplicationManager::new(replication_strategy, config.node_id);
let query_executor = DistributedQueryExecutor::new(config.node_id);
let default_resolution_strategy = if let Some(region_config) = &config.region_config {
region_config.conflict_resolution_strategy.clone()
} else {
ResolutionStrategy::LastWriterWins
};
let conflict_resolver = Arc::new(ConflictResolver::new(default_resolution_strategy));
let mut vector_clock = VectorClock::new();
vector_clock.increment(config.node_id);
let local_vector_clock = Arc::new(RwLock::new(vector_clock));
let region_manager = if let Some(region_config) = &config.region_config {
let manager = Arc::new(RegionManager::new(
region_config.region_id.clone(),
region_config.availability_zone_id.clone(),
region_config.consensus_strategy.clone(),
region_config.replication_strategy.clone(),
));
manager
.initialize(region_config.regions.clone())
.await
.map_err(|e| {
ClusterError::Other(format!("Failed to initialize region manager: {e}"))
})?;
manager
.register_node(
config.node_id,
region_config.region_id.clone(),
region_config.availability_zone_id.clone(),
region_config.data_center.clone(),
region_config.rack.clone(),
)
.await
.map_err(|e| {
ClusterError::Other(format!("Failed to register node in region manager: {e}"))
})?;
Some(manager)
} else {
None
};
let edge_manager = if let Some(region_config) = &config.region_config {
if let Some(edge_config) = ®ion_config.edge_config {
if edge_config.enabled {
let manager = Arc::new(EdgeComputingManager::new());
manager
.register_device(edge_config.device_profile.clone())
.await
.map_err(|e| {
ClusterError::Other(format!("Failed to register edge device: {e}"))
})?;
Some(manager)
} else {
None
}
} else {
None
}
} else {
None
};
Ok(Self {
config,
consensus,
discovery,
replication,
query_executor,
region_manager,
conflict_resolver,
edge_manager,
local_vector_clock,
running: Arc::new(RwLock::new(false)),
byzantine_mode: Arc::new(RwLock::new(false)),
network_isolated: Arc::new(RwLock::new(false)),
})
}
pub async fn start(&mut self) -> Result<()> {
{
let mut running = self.running.write().await;
if *running {
return Ok(());
}
*running = true;
}
tracing::info!(
"Starting cluster node {} at {} with {} peers",
self.config.node_id,
self.config.address,
self.config.peers.len()
);
self.discovery
.start()
.await
.map_err(|e| ClusterError::Other(format!("Failed to start discovery service: {e}")))?;
let discovered_nodes = self
.discovery
.discover_nodes()
.await
.map_err(|e| ClusterError::Other(format!("Failed to discover nodes: {e}")))?;
for node in discovered_nodes {
if node.node_id != self.config.node_id {
self.replication
.add_replica(node.node_id, node.address.to_string());
self.query_executor.add_node(node.node_id).await;
}
}
self.consensus
.init()
.await
.map_err(|e| ClusterError::Other(format!("Failed to initialize consensus: {e}")))?;
tracing::info!("Cluster node {} started successfully", self.config.node_id);
self.start_background_tasks().await;
Ok(())
}
pub async fn stop(&mut self) -> Result<()> {
let mut running = self.running.write().await;
if !*running {
return Ok(());
}
tracing::info!("Stopping cluster node {}", self.config.node_id);
self.discovery
.stop()
.await
.map_err(|e| ClusterError::Other(format!("Failed to stop discovery service: {e}")))?;
*running = false;
tracing::info!("Cluster node {} stopped", self.config.node_id);
Ok(())
}
pub async fn is_leader(&self) -> bool {
self.consensus.is_leader().await
}
pub async fn current_term(&self) -> u64 {
self.consensus.current_term().await
}
pub async fn insert_triple(
&self,
subject: &str,
predicate: &str,
object: &str,
) -> Result<RdfResponse> {
if !self.is_leader().await {
return Err(ClusterError::NotLeader);
}
let response = self
.consensus
.insert_triple(
subject.to_string(),
predicate.to_string(),
object.to_string(),
)
.await?;
Ok(response)
}
pub async fn delete_triple(
&self,
subject: &str,
predicate: &str,
object: &str,
) -> Result<RdfResponse> {
if !self.is_leader().await {
return Err(ClusterError::NotLeader);
}
let response = self
.consensus
.delete_triple(
subject.to_string(),
predicate.to_string(),
object.to_string(),
)
.await?;
Ok(response)
}
pub async fn clear_store(&self) -> Result<RdfResponse> {
if !self.is_leader().await {
return Err(ClusterError::NotLeader);
}
let response = self.consensus.clear_store().await?;
Ok(response)
}
pub async fn begin_transaction(&self) -> Result<String> {
if !self.is_leader().await {
return Err(ClusterError::NotLeader);
}
let tx_id = uuid::Uuid::new_v4().to_string();
let _response = self.consensus.begin_transaction(tx_id.clone()).await?;
Ok(tx_id)
}
pub async fn commit_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
if !self.is_leader().await {
return Err(ClusterError::NotLeader);
}
let response = self.consensus.commit_transaction(tx_id.to_string()).await?;
Ok(response)
}
pub async fn rollback_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
if !self.is_leader().await {
return Err(ClusterError::NotLeader);
}
let response = self
.consensus
.rollback_transaction(tx_id.to_string())
.await?;
Ok(response)
}
pub async fn query_triples(
&self,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Vec<(String, String, String)> {
self.consensus.query(subject, predicate, object).await
}
pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
let bindings = self
.query_executor
.execute_query(sparql)
.await
.map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))?;
let results = bindings
.into_iter()
.map(|binding| {
let vars: Vec<String> = binding
.variables
.into_iter()
.map(|(var, val)| format!("{var}: {val}"))
.collect();
vars.join(", ")
})
.collect();
Ok(results)
}
pub async fn query_sparql_bindings(&self, sparql: &str) -> Result<Vec<ResultBinding>> {
self.query_executor
.execute_query(sparql)
.await
.map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))
}
pub async fn get_query_statistics(
&self,
) -> Result<std::collections::HashMap<String, distributed_query::QueryStats>> {
Ok(self.query_executor.get_statistics().await)
}
pub async fn clear_query_cache(&self) -> Result<()> {
self.query_executor.clear_cache().await;
Ok(())
}
pub async fn len(&self) -> usize {
self.consensus.len().await
}
pub async fn is_empty(&self) -> bool {
self.consensus.is_empty().await
}
pub async fn add_cluster_node(
&mut self,
node_id: OxirsNodeId,
address: SocketAddr,
) -> Result<()> {
if node_id == self.config.node_id {
return Err(ClusterError::Config(
"Cannot add self to cluster".to_string(),
));
}
self.config.add_peer(node_id);
let node_info = NodeInfo::new(node_id, address);
self.discovery.add_node(node_info);
self.replication.add_replica(node_id, address.to_string());
self.query_executor.add_node(node_id).await;
self.consensus.add_peer(node_id);
tracing::info!("Added node {} at {} to cluster", node_id, address);
Ok(())
}
pub async fn remove_cluster_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
if node_id == self.config.node_id {
return Err(ClusterError::Config(
"Cannot remove self from cluster".to_string(),
));
}
self.config.peers.retain(|&id| id != node_id);
self.discovery.remove_node(node_id);
self.replication.remove_replica(node_id);
self.query_executor.remove_node(node_id).await;
self.consensus.remove_peer(node_id);
tracing::info!("Removed node {} from cluster", node_id);
Ok(())
}
pub async fn get_status(&self) -> ClusterStatus {
let consensus_status = self.consensus.get_status().await;
let discovery_stats = self.discovery.get_stats().clone();
let replication_stats = self.replication.get_stats().clone();
let region_status = if let Some(region_manager) = &self.region_manager {
let region_id = region_manager.get_local_region().to_string();
let availability_zone_id = region_manager.get_local_availability_zone().to_string();
let regional_peers = region_manager.get_nodes_in_region(®ion_id).await;
let topology = region_manager.get_topology().await;
let monitoring_active = region_manager.is_monitoring_active().await;
Some(RegionStatus {
region_id,
availability_zone_id,
regional_peer_count: regional_peers.len(),
total_regions: topology.regions.len(),
monitoring_active,
})
} else {
None
};
ClusterStatus {
node_id: self.config.node_id,
address: self.config.address,
is_leader: consensus_status.is_leader,
current_term: consensus_status.current_term,
peer_count: consensus_status.peer_count,
triple_count: consensus_status.triple_count,
discovery_stats,
replication_stats,
is_running: *self.running.read().await,
region_status,
}
}
async fn start_background_tasks(&mut self) {
let running = Arc::clone(&self.running);
let discovery_config = self.config.discovery.clone().unwrap_or_default();
let mut discovery_clone =
DiscoveryService::new(self.config.node_id, self.config.address, discovery_config);
tokio::spawn(async move {
while *running.read().await {
discovery_clone.run_periodic_tasks().await;
tokio::time::sleep(std::time::Duration::from_secs(1)).await;
}
});
let mut replication_clone = ReplicationManager::with_raft_consensus(self.config.node_id);
let running_clone = Arc::clone(&self.running);
tokio::spawn(async move {
if *running_clone.read().await {
replication_clone.run_maintenance().await; }
});
}
pub async fn add_node_with_consensus(
&mut self,
node_id: OxirsNodeId,
address: SocketAddr,
) -> Result<()> {
self.consensus
.add_node_with_consensus(node_id, address.to_string())
.await
.map_err(|e| {
ClusterError::Other(format!("Failed to add node through consensus: {e}"))
})?;
self.config.add_peer(node_id);
let node_info = NodeInfo::new(node_id, address);
self.discovery.add_node(node_info);
self.replication.add_replica(node_id, address.to_string());
self.query_executor.add_node(node_id).await;
Ok(())
}
pub async fn remove_node_with_consensus(&mut self, node_id: OxirsNodeId) -> Result<()> {
self.consensus
.remove_node_with_consensus(node_id)
.await
.map_err(|e| {
ClusterError::Other(format!("Failed to remove node through consensus: {e}"))
})?;
self.config.peers.retain(|&id| id != node_id);
self.discovery.remove_node(node_id);
self.replication.remove_replica(node_id);
self.query_executor.remove_node(node_id).await;
Ok(())
}
pub async fn graceful_shutdown(&mut self) -> Result<()> {
tracing::info!(
"Initiating graceful shutdown of cluster node {}",
self.config.node_id
);
{
let mut running = self.running.write().await;
*running = false;
}
self.consensus
.graceful_shutdown()
.await
.map_err(|e| ClusterError::Other(format!("Failed to shutdown consensus: {e}")))?;
self.discovery
.stop()
.await
.map_err(|e| ClusterError::Other(format!("Failed to stop discovery: {e}")))?;
tracing::info!("Cluster node {} gracefully shutdown", self.config.node_id);
Ok(())
}
pub async fn transfer_leadership(&mut self, target_node: OxirsNodeId) -> Result<()> {
if !self.config.peers.contains(&target_node) {
return Err(ClusterError::Config(format!(
"Target node {target_node} not in cluster"
)));
}
self.consensus
.transfer_leadership(target_node)
.await
.map_err(|e| ClusterError::Other(format!("Failed to transfer leadership: {e}")))?;
Ok(())
}
pub async fn force_evict_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
self.consensus
.force_evict_node(node_id)
.await
.map_err(|e| ClusterError::Other(format!("Failed to force evict node: {e}")))?;
self.config.peers.retain(|&id| id != node_id);
self.discovery.remove_node(node_id);
self.replication.remove_replica(node_id);
self.query_executor.remove_node(node_id).await;
Ok(())
}
pub async fn check_cluster_health(&self) -> Result<Vec<consensus::NodeHealthStatus>> {
self.consensus
.check_peer_health()
.await
.map_err(|e| ClusterError::Other(format!("Failed to check cluster health: {e}")))
}
pub async fn attempt_recovery(&mut self) -> Result<()> {
self.consensus
.attempt_recovery()
.await
.map_err(|e| ClusterError::Other(format!("Failed to recover cluster: {e}")))?;
tracing::info!(
"Cluster recovery completed for node {}",
self.config.node_id
);
Ok(())
}
pub fn id(&self) -> OxirsNodeId {
self.config.node_id
}
pub async fn count_triples(&self) -> Result<usize> {
Ok(self.len().await)
}
pub async fn is_active(&self) -> Result<bool> {
Ok(*self.running.read().await && !*self.network_isolated.read().await)
}
pub async fn isolate_network(&self) -> Result<()> {
let mut isolated = self.network_isolated.write().await;
*isolated = true;
tracing::info!("Node {} network isolated", self.config.node_id);
Ok(())
}
pub async fn restore_network(&self) -> Result<()> {
let mut isolated = self.network_isolated.write().await;
*isolated = false;
tracing::info!("Node {} network restored", self.config.node_id);
Ok(())
}
pub async fn enable_byzantine_mode(&self) -> Result<()> {
let mut byzantine = self.byzantine_mode.write().await;
*byzantine = true;
tracing::info!("Node {} Byzantine mode enabled", self.config.node_id);
Ok(())
}
pub async fn is_byzantine(&self) -> Result<bool> {
Ok(*self.byzantine_mode.read().await)
}
pub fn region_manager(&self) -> Option<&Arc<RegionManager>> {
self.region_manager.as_ref()
}
pub fn is_multi_region_enabled(&self) -> bool {
self.region_manager.is_some()
}
pub fn get_region_id(&self) -> Option<String> {
self.region_manager
.as_ref()
.map(|rm| rm.get_local_region().to_string())
}
pub fn get_availability_zone_id(&self) -> Option<String> {
self.region_manager
.as_ref()
.map(|rm| rm.get_local_availability_zone().to_string())
}
pub async fn get_regional_peers(&self) -> Result<Vec<OxirsNodeId>> {
if let Some(region_manager) = &self.region_manager {
let region_id = region_manager.get_local_region();
Ok(region_manager.get_nodes_in_region(region_id).await)
} else {
Err(ClusterError::Config(
"Multi-region not configured".to_string(),
))
}
}
pub async fn get_regional_leader_candidates(&self) -> Result<Vec<OxirsNodeId>> {
if let Some(region_manager) = &self.region_manager {
let region_id = region_manager.get_local_region();
Ok(region_manager.get_leader_candidates(region_id).await)
} else {
Ok(self.config.peers.clone())
}
}
pub async fn get_cross_region_replication_targets(&self) -> Result<Vec<String>> {
if let Some(region_manager) = &self.region_manager {
let region_id = region_manager.get_local_region();
region_manager
.calculate_replication_targets(region_id)
.await
.map_err(|e| {
ClusterError::Other(format!("Failed to calculate replication targets: {e}"))
})
} else {
Ok(Vec::new())
}
}
pub async fn monitor_region_latencies(&self) -> Result<()> {
if let Some(region_manager) = &self.region_manager {
region_manager.monitor_latencies().await.map_err(|e| {
ClusterError::Other(format!("Failed to monitor region latencies: {e}"))
})
} else {
Ok(())
}
}
pub async fn get_region_health(&self, region_id: &str) -> Result<region_manager::RegionHealth> {
if let Some(region_manager) = &self.region_manager {
region_manager
.get_region_health(region_id)
.await
.map_err(|e| ClusterError::Other(format!("Failed to get region health: {e}")))
} else {
Err(ClusterError::Config(
"Multi-region not configured".to_string(),
))
}
}
pub async fn perform_region_failover(
&self,
failed_region: &str,
target_region: &str,
) -> Result<()> {
if let Some(region_manager) = &self.region_manager {
region_manager
.perform_region_failover(failed_region, target_region)
.await
.map_err(|e| ClusterError::Other(format!("Failed to perform region failover: {e}")))
} else {
Err(ClusterError::Config(
"Multi-region not configured".to_string(),
))
}
}
pub async fn get_region_topology(&self) -> Result<region_manager::RegionTopology> {
if let Some(region_manager) = &self.region_manager {
Ok(region_manager.get_topology().await)
} else {
Err(ClusterError::Config(
"Multi-region not configured".to_string(),
))
}
}
pub async fn add_node_to_region(
&self,
node_id: OxirsNodeId,
region_id: String,
availability_zone_id: String,
data_center: Option<String>,
rack: Option<String>,
) -> Result<()> {
if let Some(region_manager) = &self.region_manager {
region_manager
.register_node(node_id, region_id, availability_zone_id, data_center, rack)
.await
.map_err(|e| ClusterError::Other(format!("Failed to add node to region: {e}")))
} else {
Err(ClusterError::Config(
"Multi-region not configured".to_string(),
))
}
}
pub fn conflict_resolver(&self) -> &Arc<ConflictResolver> {
&self.conflict_resolver
}
pub async fn get_vector_clock(&self) -> VectorClock {
self.local_vector_clock.read().await.clone()
}
pub async fn update_vector_clock(&self, received_clock: &VectorClock) {
let mut clock = self.local_vector_clock.write().await;
clock.update(received_clock);
clock.increment(self.config.node_id);
}
pub async fn create_timestamped_operation(
&self,
operation: conflict_resolution::RdfOperation,
priority: u32,
) -> TimestampedOperation {
let mut clock = self.local_vector_clock.write().await;
clock.increment(self.config.node_id);
TimestampedOperation {
operation_id: uuid::Uuid::new_v4().to_string(),
origin_node: self.config.node_id,
vector_clock: clock.clone(),
physical_time: std::time::SystemTime::now(),
operation,
priority,
}
}
pub async fn detect_operation_conflicts(
&self,
operations: &[TimestampedOperation],
) -> Result<Vec<conflict_resolution::ConflictType>> {
self.conflict_resolver
.detect_conflicts(operations)
.await
.map_err(|e| ClusterError::Other(format!("Failed to detect conflicts: {e}")))
}
pub async fn resolve_operation_conflicts(
&self,
conflicts: &[conflict_resolution::ConflictType],
) -> Result<Vec<conflict_resolution::ResolutionResult>> {
self.conflict_resolver
.resolve_conflicts(conflicts)
.await
.map_err(|e| ClusterError::Other(format!("Failed to resolve conflicts: {e}")))
}
pub async fn submit_conflict_aware_operation(
&self,
operation: conflict_resolution::RdfOperation,
priority: u32,
) -> Result<RdfResponse> {
let _timestamped_op = self
.create_timestamped_operation(operation.clone(), priority)
.await;
match operation {
conflict_resolution::RdfOperation::Insert {
subject,
predicate,
object,
..
} => self.insert_triple(&subject, &predicate, &object).await,
conflict_resolution::RdfOperation::Delete {
subject,
predicate,
object,
..
} => self.delete_triple(&subject, &predicate, &object).await,
conflict_resolution::RdfOperation::Clear { .. } => self.clear_store().await,
conflict_resolution::RdfOperation::Update {
old_triple,
new_triple,
..
} => {
let _delete_result = self
.delete_triple(&old_triple.0, &old_triple.1, &old_triple.2)
.await?;
self.insert_triple(&new_triple.0, &new_triple.1, &new_triple.2)
.await
}
conflict_resolution::RdfOperation::Batch { operations: _ } => {
Ok(RdfResponse::Success)
}
}
}
pub async fn get_conflict_resolution_statistics(
&self,
) -> conflict_resolution::ResolutionStatistics {
self.conflict_resolver.get_statistics().await
}
}
#[derive(Debug, Clone)]
pub struct ClusterStatus {
pub node_id: OxirsNodeId,
pub address: SocketAddr,
pub is_leader: bool,
pub current_term: u64,
pub peer_count: usize,
pub triple_count: usize,
pub discovery_stats: discovery::DiscoveryStats,
pub replication_stats: ReplicationStats,
pub is_running: bool,
pub region_status: Option<RegionStatus>,
}
#[derive(Debug, Clone)]
pub struct RegionStatus {
pub region_id: String,
pub availability_zone_id: String,
pub regional_peer_count: usize,
pub total_regions: usize,
pub monitoring_active: bool,
}
pub struct DistributedStore {
node: ClusterNode,
}
impl DistributedStore {
pub async fn new(config: NodeConfig) -> Result<Self> {
let node = ClusterNode::new(config).await?;
Ok(Self { node })
}
pub async fn start(&mut self) -> Result<()> {
self.node.start().await
}
pub async fn stop(&mut self) -> Result<()> {
self.node.stop().await
}
pub async fn insert_triple(
&mut self,
subject: &str,
predicate: &str,
object: &str,
) -> Result<()> {
let _response = self.node.insert_triple(subject, predicate, object).await?;
Ok(())
}
pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
self.node.query_sparql(sparql).await
}
pub async fn query_pattern(
&self,
subject: Option<&str>,
predicate: Option<&str>,
object: Option<&str>,
) -> Vec<(String, String, String)> {
self.node.query_triples(subject, predicate, object).await
}
pub async fn get_status(&self) -> ClusterStatus {
self.node.get_status().await
}
}
pub use consensus::ConsensusError;
pub use discovery::DiscoveryError;
pub use replication::ReplicationError;
#[cfg(test)]
mod tests {
use super::*;
use std::net::{IpAddr, Ipv4Addr};
#[tokio::test]
async fn test_node_config_creation() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let config = NodeConfig::new(1, addr);
assert_eq!(config.node_id, 1);
assert_eq!(config.address, addr);
assert_eq!(config.data_dir, "./data/node-1");
assert!(config.peers.is_empty());
assert!(config.discovery.is_some());
assert!(config.replication_strategy.is_some());
assert!(config.region_config.is_none());
}
#[tokio::test]
async fn test_node_config_add_peer() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let mut config = NodeConfig::new(1, addr);
config.add_peer(2);
config.add_peer(3);
config.add_peer(2);
assert_eq!(config.peers, vec![2, 3]);
}
#[tokio::test]
async fn test_node_config_no_self_peer() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let mut config = NodeConfig::new(1, addr);
config.add_peer(1);
assert!(config.peers.is_empty());
}
#[tokio::test]
async fn test_cluster_node_creation() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let config = NodeConfig::new(1, addr);
let node = ClusterNode::new(config).await;
assert!(node.is_ok());
let node = node.unwrap();
assert_eq!(node.config.node_id, 1);
assert_eq!(node.config.address, addr);
}
#[tokio::test]
async fn test_cluster_node_empty_data_dir_error() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let mut config = NodeConfig::new(1, addr);
config.data_dir = String::new();
let result = ClusterNode::new(config).await;
assert!(result.is_err());
if let Err(e) = result {
assert!(e.to_string().contains("Data directory cannot be empty"));
}
}
#[tokio::test]
async fn test_distributed_store_creation() {
let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
let config = NodeConfig::new(1, addr);
let store = DistributedStore::new(config).await;
assert!(store.is_ok());
}
#[test]
fn test_cluster_error_types() {
let err = ClusterError::Config("test error".to_string());
assert!(err.to_string().contains("Configuration error: test error"));
let err = ClusterError::NotLeader;
assert_eq!(err.to_string(), "Not the leader node");
let err = ClusterError::Network("connection failed".to_string());
assert!(err.to_string().contains("Network error: connection failed"));
}
}