1#![allow(clippy::field_reassign_with_default)]
53#![allow(clippy::single_match)]
54#![allow(clippy::collapsible_if)]
55#![allow(clippy::clone_on_copy)]
56#![allow(clippy::type_complexity)]
57#![allow(clippy::collapsible_match)]
58#![allow(clippy::manual_clamp)]
59#![allow(clippy::needless_range_loop)]
60#![allow(clippy::or_fun_call)]
61#![allow(clippy::if_same_then_else)]
62#![allow(clippy::only_used_in_recursion)]
63#![allow(clippy::new_without_default)]
64#![allow(clippy::derivable_impls)]
65#![allow(clippy::useless_conversion)]
66use serde::{Deserialize, Serialize};
67use std::net::SocketAddr;
68use std::sync::Arc;
69use tokio::sync::RwLock;
70
71pub mod adaptive_leader_election;
72pub mod advanced_partitioning;
73pub mod advanced_storage;
74pub mod alerting;
75pub mod auto_scaling;
76pub mod backup_restore;
77pub mod circuit_breaker;
78pub mod cloud_integration;
79pub mod cluster_metrics;
80pub mod compression_strategy;
81pub mod conflict_resolution;
82pub mod consensus;
83pub mod crash_recovery;
84pub mod data_rebalancing;
85pub mod disaster_recovery;
86pub mod discovery;
87pub mod distributed_query;
88pub mod distributed_tracing;
89pub mod edge_computing;
90pub mod encryption;
91pub mod enhanced_node_discovery;
92pub mod enhanced_snapshotting;
93pub mod error;
94pub mod failover;
95pub mod federation;
96pub mod gpu_acceleration;
97pub mod health_monitor;
98pub mod health_monitoring;
99pub mod memory_optimization;
100pub mod merkle_tree;
101pub mod ml_optimization;
102pub mod multi_tenant;
103pub mod mvcc;
104pub mod mvcc_storage;
105pub mod network;
106pub mod neural_architecture_search;
107pub mod node_lifecycle;
108pub mod node_status_tracker;
109pub mod operational_transformation;
110pub mod optimization;
111pub mod partition_detection;
112pub mod performance_metrics;
113pub mod performance_monitor;
114pub mod raft;
115pub mod raft_optimization;
116pub mod raft_profiling;
117pub mod raft_state;
118pub mod range_partitioning;
119pub mod read_replica;
120pub mod region_manager;
121pub mod replication;
122pub mod replication_lag_monitor;
123pub mod rl_consensus_optimizer;
124pub mod rolling_upgrade;
125pub mod rolling_upgrade_orchestrator;
126pub mod split_brain_detector;
127pub mod visualization_dashboard;
128pub mod zero_downtime_migration;
129pub mod cross_dc;
132pub mod network_compression;
133pub mod security;
134pub mod serialization;
135pub mod shard;
136pub mod shard_manager;
137pub mod shard_migration;
138pub mod shard_routing;
139pub mod split_brain_prevention;
140pub mod storage;
141pub mod strong_consistency;
142pub mod tls;
143pub mod topology;
144pub mod transaction;
145pub mod transaction_optimizer;
146
147#[cfg(feature = "bft")]
148pub mod bft;
149#[cfg(feature = "bft")]
150pub mod bft_consensus;
151#[cfg(feature = "bft")]
152pub mod bft_network;
153
154pub mod gossip_scaling;
155pub mod sla_manager;
156pub mod stream_integration;
157
158pub mod adaptive_consistent_hash;
160pub mod cross_dc_consistency;
161pub mod distributed_tx_coordinator;
162
163pub mod vnodes_hash_ring;
165
166pub mod membership_gossip;
168
169pub mod leader_election;
171
172pub mod snapshot_manager;
174
175pub mod consistent_shard_router;
177
178pub mod partition_rebalancer;
180
181pub mod node_monitor;
183
184pub mod failover_manager;
186
187pub mod anti_entropy;
189
190pub mod replication_throttle;
192
193pub mod data_migrator;
196
197pub mod shard_router;
199
200pub mod election_timer;
203
204pub use error::{ClusterError, Result};
205pub use failover::{FailoverConfig, FailoverManager, FailoverStrategy, RecoveryAction};
206pub use health_monitor::{HealthMonitor, HealthMonitorConfig, NodeHealth, SystemMetrics};
207
208use conflict_resolution::{
219 ConflictResolver, ResolutionStrategy, TimestampedOperation, VectorClock,
220};
221use consensus::ConsensusManager;
222use discovery::{DiscoveryConfig, DiscoveryService, NodeInfo};
223use distributed_query::{DistributedQueryExecutor, ResultBinding};
224use edge_computing::{EdgeComputingManager, EdgeDeploymentStrategy, EdgeDeviceProfile};
225use raft::{OxirsNodeId, RdfResponse};
226use region_manager::{
227 ConsensusStrategy as RegionConsensusStrategy, MultiRegionReplicationStrategy, Region,
228 RegionManager,
229};
230use replication::{ReplicationManager, ReplicationStats, ReplicationStrategy};
231
232#[derive(Debug, Clone, Serialize, Deserialize)]
234pub struct MultiRegionConfig {
235 pub region_id: String,
237 pub availability_zone_id: String,
239 pub data_center: Option<String>,
241 pub rack: Option<String>,
243 pub regions: Vec<Region>,
245 pub consensus_strategy: RegionConsensusStrategy,
247 pub replication_strategy: MultiRegionReplicationStrategy,
249 pub conflict_resolution_strategy: ResolutionStrategy,
251 pub edge_config: Option<EdgeComputingConfig>,
253 pub enable_monitoring: bool,
255}
256
257#[derive(Debug, Clone, Serialize, Deserialize)]
259pub struct EdgeComputingConfig {
260 pub enabled: bool,
262 pub device_profile: EdgeDeviceProfile,
264 pub deployment_strategy: EdgeDeploymentStrategy,
266 pub enable_intelligent_caching: bool,
268 pub enable_network_monitoring: bool,
270}
271
272#[derive(Debug, Clone, Serialize, Deserialize)]
274pub struct NodeConfig {
275 pub node_id: OxirsNodeId,
277 pub address: SocketAddr,
279 pub data_dir: String,
281 pub peers: Vec<OxirsNodeId>,
283 pub discovery: Option<DiscoveryConfig>,
285 pub replication_strategy: Option<ReplicationStrategy>,
287 #[cfg(feature = "bft")]
289 pub use_bft: bool,
290 pub region_config: Option<MultiRegionConfig>,
292}
293
294impl NodeConfig {
295 pub fn new(node_id: OxirsNodeId, address: SocketAddr) -> Self {
297 Self {
298 node_id,
299 address,
300 data_dir: format!("./data/node-{node_id}"),
301 peers: Vec::new(),
302 discovery: Some(DiscoveryConfig::default()),
303 replication_strategy: Some(ReplicationStrategy::default()),
304 #[cfg(feature = "bft")]
305 use_bft: false,
306 region_config: None,
307 }
308 }
309
310 pub fn add_peer(&mut self, peer_id: OxirsNodeId) -> &mut Self {
312 if !self.peers.contains(&peer_id) && peer_id != self.node_id {
313 self.peers.push(peer_id);
314 }
315 self
316 }
317
318 pub fn with_discovery(mut self, discovery: DiscoveryConfig) -> Self {
320 self.discovery = Some(discovery);
321 self
322 }
323
324 pub fn with_replication_strategy(mut self, strategy: ReplicationStrategy) -> Self {
326 self.replication_strategy = Some(strategy);
327 self
328 }
329
330 #[cfg(feature = "bft")]
332 pub fn with_bft(mut self, enable: bool) -> Self {
333 self.use_bft = enable;
334 self
335 }
336
337 pub fn with_multi_region(mut self, region_config: MultiRegionConfig) -> Self {
339 self.region_config = Some(region_config);
340 self
341 }
342
343 pub fn is_multi_region_enabled(&self) -> bool {
345 self.region_config.is_some()
346 }
347
348 pub fn region_id(&self) -> Option<&str> {
350 self.region_config
351 .as_ref()
352 .map(|config| config.region_id.as_str())
353 }
354
355 pub fn availability_zone_id(&self) -> Option<&str> {
357 self.region_config
358 .as_ref()
359 .map(|config| config.availability_zone_id.as_str())
360 }
361}
362
363pub struct ClusterNode {
365 config: NodeConfig,
366 consensus: ConsensusManager,
367 discovery: DiscoveryService,
368 replication: ReplicationManager,
369 query_executor: DistributedQueryExecutor,
370 region_manager: Option<Arc<RegionManager>>,
371 conflict_resolver: Arc<ConflictResolver>,
372 #[allow(dead_code)]
373 edge_manager: Option<Arc<EdgeComputingManager>>,
374 local_vector_clock: Arc<RwLock<VectorClock>>,
375 running: Arc<RwLock<bool>>,
376 byzantine_mode: Arc<RwLock<bool>>,
377 network_isolated: Arc<RwLock<bool>>,
378}
379
380impl ClusterNode {
381 pub async fn new(config: NodeConfig) -> Result<Self> {
383 if config.data_dir.is_empty() {
385 return Err(ClusterError::Config(
386 "Data directory cannot be empty".to_string(),
387 ));
388 }
389
390 tokio::fs::create_dir_all(&config.data_dir)
392 .await
393 .map_err(|e| ClusterError::Other(format!("Failed to create data directory: {e}")))?;
394
395 let consensus = ConsensusManager::new(config.node_id, config.peers.clone());
397
398 let discovery_config = config.discovery.clone().unwrap_or_default();
400 let discovery = DiscoveryService::new(config.node_id, config.address, discovery_config);
401
402 let replication_strategy = config.replication_strategy.clone().unwrap_or_default();
404 let replication = ReplicationManager::new(replication_strategy, config.node_id);
405
406 let query_executor = DistributedQueryExecutor::new(config.node_id);
408
409 let default_resolution_strategy = if let Some(region_config) = &config.region_config {
411 region_config.conflict_resolution_strategy.clone()
412 } else {
413 ResolutionStrategy::LastWriterWins
414 };
415 let conflict_resolver = Arc::new(ConflictResolver::new(default_resolution_strategy));
416
417 let mut vector_clock = VectorClock::new();
419 vector_clock.increment(config.node_id);
420 let local_vector_clock = Arc::new(RwLock::new(vector_clock));
421
422 let region_manager = if let Some(region_config) = &config.region_config {
424 let manager = Arc::new(RegionManager::new(
425 region_config.region_id.clone(),
426 region_config.availability_zone_id.clone(),
427 region_config.consensus_strategy.clone(),
428 region_config.replication_strategy.clone(),
429 ));
430
431 manager
433 .initialize(region_config.regions.clone())
434 .await
435 .map_err(|e| {
436 ClusterError::Other(format!("Failed to initialize region manager: {e}"))
437 })?;
438
439 manager
441 .register_node(
442 config.node_id,
443 region_config.region_id.clone(),
444 region_config.availability_zone_id.clone(),
445 region_config.data_center.clone(),
446 region_config.rack.clone(),
447 )
448 .await
449 .map_err(|e| {
450 ClusterError::Other(format!("Failed to register node in region manager: {e}"))
451 })?;
452
453 Some(manager)
454 } else {
455 None
456 };
457
458 let edge_manager = if let Some(region_config) = &config.region_config {
460 if let Some(edge_config) = ®ion_config.edge_config {
461 if edge_config.enabled {
462 let manager = Arc::new(EdgeComputingManager::new());
463
464 manager
466 .register_device(edge_config.device_profile.clone())
467 .await
468 .map_err(|e| {
469 ClusterError::Other(format!("Failed to register edge device: {e}"))
470 })?;
471
472 Some(manager)
473 } else {
474 None
475 }
476 } else {
477 None
478 }
479 } else {
480 None
481 };
482
483 Ok(Self {
484 config,
485 consensus,
486 discovery,
487 replication,
488 query_executor,
489 region_manager,
490 conflict_resolver,
491 edge_manager,
492 local_vector_clock,
493 running: Arc::new(RwLock::new(false)),
494 byzantine_mode: Arc::new(RwLock::new(false)),
495 network_isolated: Arc::new(RwLock::new(false)),
496 })
497 }
498
499 pub async fn start(&mut self) -> Result<()> {
501 {
502 let mut running = self.running.write().await;
503 if *running {
504 return Ok(());
505 }
506 *running = true;
507 }
508
509 tracing::info!(
510 "Starting cluster node {} at {} with {} peers",
511 self.config.node_id,
512 self.config.address,
513 self.config.peers.len()
514 );
515
516 self.discovery
518 .start()
519 .await
520 .map_err(|e| ClusterError::Other(format!("Failed to start discovery service: {e}")))?;
521
522 let discovered_nodes = self
524 .discovery
525 .discover_nodes()
526 .await
527 .map_err(|e| ClusterError::Other(format!("Failed to discover nodes: {e}")))?;
528
529 for node in discovered_nodes {
531 if node.node_id != self.config.node_id {
532 self.replication
533 .add_replica(node.node_id, node.address.to_string());
534 self.query_executor.add_node(node.node_id).await;
535 }
536 }
537
538 self.consensus
540 .init()
541 .await
542 .map_err(|e| ClusterError::Other(format!("Failed to initialize consensus: {e}")))?;
543
544 tracing::info!("Cluster node {} started successfully", self.config.node_id);
545
546 self.start_background_tasks().await;
548
549 Ok(())
550 }
551
552 pub async fn stop(&mut self) -> Result<()> {
554 let mut running = self.running.write().await;
555 if !*running {
556 return Ok(());
557 }
558
559 tracing::info!("Stopping cluster node {}", self.config.node_id);
560
561 self.discovery
563 .stop()
564 .await
565 .map_err(|e| ClusterError::Other(format!("Failed to stop discovery service: {e}")))?;
566
567 *running = false;
568
569 tracing::info!("Cluster node {} stopped", self.config.node_id);
570
571 Ok(())
572 }
573
574 pub async fn is_leader(&self) -> bool {
576 self.consensus.is_leader().await
577 }
578
579 pub async fn current_term(&self) -> u64 {
581 self.consensus.current_term().await
582 }
583
584 pub async fn insert_triple(
586 &self,
587 subject: &str,
588 predicate: &str,
589 object: &str,
590 ) -> Result<RdfResponse> {
591 if !self.is_leader().await {
592 return Err(ClusterError::NotLeader);
593 }
594
595 let response = self
596 .consensus
597 .insert_triple(
598 subject.to_string(),
599 predicate.to_string(),
600 object.to_string(),
601 )
602 .await?;
603
604 Ok(response)
605 }
606
607 pub async fn delete_triple(
609 &self,
610 subject: &str,
611 predicate: &str,
612 object: &str,
613 ) -> Result<RdfResponse> {
614 if !self.is_leader().await {
615 return Err(ClusterError::NotLeader);
616 }
617
618 let response = self
619 .consensus
620 .delete_triple(
621 subject.to_string(),
622 predicate.to_string(),
623 object.to_string(),
624 )
625 .await?;
626
627 Ok(response)
628 }
629
630 pub async fn clear_store(&self) -> Result<RdfResponse> {
632 if !self.is_leader().await {
633 return Err(ClusterError::NotLeader);
634 }
635
636 let response = self.consensus.clear_store().await?;
637 Ok(response)
638 }
639
640 pub async fn begin_transaction(&self) -> Result<String> {
642 if !self.is_leader().await {
643 return Err(ClusterError::NotLeader);
644 }
645
646 let tx_id = uuid::Uuid::new_v4().to_string();
647 let _response = self.consensus.begin_transaction(tx_id.clone()).await?;
648
649 Ok(tx_id)
650 }
651
652 pub async fn commit_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
654 if !self.is_leader().await {
655 return Err(ClusterError::NotLeader);
656 }
657
658 let response = self.consensus.commit_transaction(tx_id.to_string()).await?;
659 Ok(response)
660 }
661
662 pub async fn rollback_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
664 if !self.is_leader().await {
665 return Err(ClusterError::NotLeader);
666 }
667
668 let response = self
669 .consensus
670 .rollback_transaction(tx_id.to_string())
671 .await?;
672 Ok(response)
673 }
674
675 pub async fn query_triples(
677 &self,
678 subject: Option<&str>,
679 predicate: Option<&str>,
680 object: Option<&str>,
681 ) -> Vec<(String, String, String)> {
682 self.consensus.query(subject, predicate, object).await
683 }
684
685 pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
687 let bindings = self
688 .query_executor
689 .execute_query(sparql)
690 .await
691 .map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))?;
692
693 let results = bindings
695 .into_iter()
696 .map(|binding| {
697 let vars: Vec<String> = binding
698 .variables
699 .into_iter()
700 .map(|(var, val)| format!("{var}: {val}"))
701 .collect();
702 vars.join(", ")
703 })
704 .collect();
705
706 Ok(results)
707 }
708
709 pub async fn query_sparql_bindings(&self, sparql: &str) -> Result<Vec<ResultBinding>> {
711 self.query_executor
712 .execute_query(sparql)
713 .await
714 .map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))
715 }
716
717 pub async fn get_query_statistics(
719 &self,
720 ) -> Result<std::collections::HashMap<String, distributed_query::QueryStats>> {
721 Ok(self.query_executor.get_statistics().await)
722 }
723
724 pub async fn clear_query_cache(&self) -> Result<()> {
726 self.query_executor.clear_cache().await;
727 Ok(())
728 }
729
730 pub async fn len(&self) -> usize {
732 self.consensus.len().await
733 }
734
735 pub async fn is_empty(&self) -> bool {
737 self.consensus.is_empty().await
738 }
739
740 pub async fn add_cluster_node(
742 &mut self,
743 node_id: OxirsNodeId,
744 address: SocketAddr,
745 ) -> Result<()> {
746 if node_id == self.config.node_id {
747 return Err(ClusterError::Config(
748 "Cannot add self to cluster".to_string(),
749 ));
750 }
751
752 self.config.add_peer(node_id);
754
755 let node_info = NodeInfo::new(node_id, address);
757 self.discovery.add_node(node_info);
758
759 self.replication.add_replica(node_id, address.to_string());
761
762 self.query_executor.add_node(node_id).await;
764
765 self.consensus.add_peer(node_id);
767
768 tracing::info!("Added node {} at {} to cluster", node_id, address);
769
770 Ok(())
771 }
772
773 pub async fn remove_cluster_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
775 if node_id == self.config.node_id {
776 return Err(ClusterError::Config(
777 "Cannot remove self from cluster".to_string(),
778 ));
779 }
780
781 self.config.peers.retain(|&id| id != node_id);
783
784 self.discovery.remove_node(node_id);
786
787 self.replication.remove_replica(node_id);
789
790 self.query_executor.remove_node(node_id).await;
792
793 self.consensus.remove_peer(node_id);
795
796 tracing::info!("Removed node {} from cluster", node_id);
797
798 Ok(())
799 }
800
801 pub async fn get_status(&self) -> ClusterStatus {
803 let consensus_status = self.consensus.get_status().await;
804 let discovery_stats = self.discovery.get_stats().clone();
805 let replication_stats = self.replication.get_stats().clone();
806
807 let region_status = if let Some(region_manager) = &self.region_manager {
809 let region_id = region_manager.get_local_region().to_string();
810 let availability_zone_id = region_manager.get_local_availability_zone().to_string();
811 let regional_peers = region_manager.get_nodes_in_region(®ion_id).await;
812 let topology = region_manager.get_topology().await;
813 let monitoring_active = region_manager.is_monitoring_active().await;
814
815 Some(RegionStatus {
816 region_id,
817 availability_zone_id,
818 regional_peer_count: regional_peers.len(),
819 total_regions: topology.regions.len(),
820 monitoring_active,
821 })
822 } else {
823 None
824 };
825
826 ClusterStatus {
827 node_id: self.config.node_id,
828 address: self.config.address,
829 is_leader: consensus_status.is_leader,
830 current_term: consensus_status.current_term,
831 peer_count: consensus_status.peer_count,
832 triple_count: consensus_status.triple_count,
833 discovery_stats,
834 replication_stats,
835 is_running: *self.running.read().await,
836 region_status,
837 }
838 }
839
840 async fn start_background_tasks(&mut self) {
842 let running = Arc::clone(&self.running);
843
844 let discovery_config = self.config.discovery.clone().unwrap_or_default();
846 let mut discovery_clone =
847 DiscoveryService::new(self.config.node_id, self.config.address, discovery_config);
848
849 tokio::spawn(async move {
850 while *running.read().await {
851 discovery_clone.run_periodic_tasks().await;
852 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
853 }
854 });
855
856 let mut replication_clone = ReplicationManager::with_raft_consensus(self.config.node_id);
858 let running_clone = Arc::clone(&self.running);
859
860 tokio::spawn(async move {
861 if *running_clone.read().await {
862 replication_clone.run_maintenance().await; }
864 });
865 }
866
867 pub async fn add_node_with_consensus(
869 &mut self,
870 node_id: OxirsNodeId,
871 address: SocketAddr,
872 ) -> Result<()> {
873 self.consensus
874 .add_node_with_consensus(node_id, address.to_string())
875 .await
876 .map_err(|e| {
877 ClusterError::Other(format!("Failed to add node through consensus: {e}"))
878 })?;
879
880 self.config.add_peer(node_id);
882
883 let node_info = NodeInfo::new(node_id, address);
885 self.discovery.add_node(node_info);
886 self.replication.add_replica(node_id, address.to_string());
887 self.query_executor.add_node(node_id).await;
888
889 Ok(())
890 }
891
892 pub async fn remove_node_with_consensus(&mut self, node_id: OxirsNodeId) -> Result<()> {
894 self.consensus
895 .remove_node_with_consensus(node_id)
896 .await
897 .map_err(|e| {
898 ClusterError::Other(format!("Failed to remove node through consensus: {e}"))
899 })?;
900
901 self.config.peers.retain(|&id| id != node_id);
903
904 self.discovery.remove_node(node_id);
906 self.replication.remove_replica(node_id);
907 self.query_executor.remove_node(node_id).await;
908
909 Ok(())
910 }
911
912 pub async fn graceful_shutdown(&mut self) -> Result<()> {
914 tracing::info!(
915 "Initiating graceful shutdown of cluster node {}",
916 self.config.node_id
917 );
918
919 {
921 let mut running = self.running.write().await;
922 *running = false;
923 }
924
925 self.consensus
927 .graceful_shutdown()
928 .await
929 .map_err(|e| ClusterError::Other(format!("Failed to shutdown consensus: {e}")))?;
930
931 self.discovery
933 .stop()
934 .await
935 .map_err(|e| ClusterError::Other(format!("Failed to stop discovery: {e}")))?;
936
937 tracing::info!("Cluster node {} gracefully shutdown", self.config.node_id);
938 Ok(())
939 }
940
941 pub async fn transfer_leadership(&mut self, target_node: OxirsNodeId) -> Result<()> {
943 if !self.config.peers.contains(&target_node) {
944 return Err(ClusterError::Config(format!(
945 "Target node {target_node} not in cluster"
946 )));
947 }
948
949 self.consensus
950 .transfer_leadership(target_node)
951 .await
952 .map_err(|e| ClusterError::Other(format!("Failed to transfer leadership: {e}")))?;
953
954 Ok(())
955 }
956
957 pub async fn force_evict_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
959 self.consensus
960 .force_evict_node(node_id)
961 .await
962 .map_err(|e| ClusterError::Other(format!("Failed to force evict node: {e}")))?;
963
964 self.config.peers.retain(|&id| id != node_id);
966 self.discovery.remove_node(node_id);
967 self.replication.remove_replica(node_id);
968 self.query_executor.remove_node(node_id).await;
969
970 Ok(())
971 }
972
973 pub async fn check_cluster_health(&self) -> Result<Vec<consensus::NodeHealthStatus>> {
975 self.consensus
976 .check_peer_health()
977 .await
978 .map_err(|e| ClusterError::Other(format!("Failed to check cluster health: {e}")))
979 }
980
981 pub async fn attempt_recovery(&mut self) -> Result<()> {
983 self.consensus
984 .attempt_recovery()
985 .await
986 .map_err(|e| ClusterError::Other(format!("Failed to recover cluster: {e}")))?;
987
988 tracing::info!(
989 "Cluster recovery completed for node {}",
990 self.config.node_id
991 );
992 Ok(())
993 }
994
995 pub fn id(&self) -> OxirsNodeId {
997 self.config.node_id
998 }
999
1000 pub async fn count_triples(&self) -> Result<usize> {
1002 Ok(self.len().await)
1003 }
1004
1005 pub async fn is_active(&self) -> Result<bool> {
1007 Ok(*self.running.read().await && !*self.network_isolated.read().await)
1008 }
1009
1010 pub async fn isolate_network(&self) -> Result<()> {
1012 let mut isolated = self.network_isolated.write().await;
1013 *isolated = true;
1014 tracing::info!("Node {} network isolated", self.config.node_id);
1015 Ok(())
1016 }
1017
1018 pub async fn restore_network(&self) -> Result<()> {
1020 let mut isolated = self.network_isolated.write().await;
1021 *isolated = false;
1022 tracing::info!("Node {} network restored", self.config.node_id);
1023 Ok(())
1024 }
1025
1026 pub async fn enable_byzantine_mode(&self) -> Result<()> {
1028 let mut byzantine = self.byzantine_mode.write().await;
1029 *byzantine = true;
1030 tracing::info!("Node {} Byzantine mode enabled", self.config.node_id);
1031 Ok(())
1032 }
1033
1034 pub async fn is_byzantine(&self) -> Result<bool> {
1036 Ok(*self.byzantine_mode.read().await)
1037 }
1038
1039 pub fn region_manager(&self) -> Option<&Arc<RegionManager>> {
1041 self.region_manager.as_ref()
1042 }
1043
1044 pub fn is_multi_region_enabled(&self) -> bool {
1046 self.region_manager.is_some()
1047 }
1048
1049 pub fn get_region_id(&self) -> Option<String> {
1051 self.region_manager
1052 .as_ref()
1053 .map(|rm| rm.get_local_region().to_string())
1054 }
1055
1056 pub fn get_availability_zone_id(&self) -> Option<String> {
1058 self.region_manager
1059 .as_ref()
1060 .map(|rm| rm.get_local_availability_zone().to_string())
1061 }
1062
1063 pub async fn get_regional_peers(&self) -> Result<Vec<OxirsNodeId>> {
1065 if let Some(region_manager) = &self.region_manager {
1066 let region_id = region_manager.get_local_region();
1067 Ok(region_manager.get_nodes_in_region(region_id).await)
1068 } else {
1069 Err(ClusterError::Config(
1070 "Multi-region not configured".to_string(),
1071 ))
1072 }
1073 }
1074
1075 pub async fn get_regional_leader_candidates(&self) -> Result<Vec<OxirsNodeId>> {
1077 if let Some(region_manager) = &self.region_manager {
1078 let region_id = region_manager.get_local_region();
1079 Ok(region_manager.get_leader_candidates(region_id).await)
1080 } else {
1081 Ok(self.config.peers.clone())
1083 }
1084 }
1085
1086 pub async fn get_cross_region_replication_targets(&self) -> Result<Vec<String>> {
1088 if let Some(region_manager) = &self.region_manager {
1089 let region_id = region_manager.get_local_region();
1090 region_manager
1091 .calculate_replication_targets(region_id)
1092 .await
1093 .map_err(|e| {
1094 ClusterError::Other(format!("Failed to calculate replication targets: {e}"))
1095 })
1096 } else {
1097 Ok(Vec::new())
1098 }
1099 }
1100
1101 pub async fn monitor_region_latencies(&self) -> Result<()> {
1103 if let Some(region_manager) = &self.region_manager {
1104 region_manager.monitor_latencies().await.map_err(|e| {
1105 ClusterError::Other(format!("Failed to monitor region latencies: {e}"))
1106 })
1107 } else {
1108 Ok(())
1109 }
1110 }
1111
1112 pub async fn get_region_health(&self, region_id: &str) -> Result<region_manager::RegionHealth> {
1114 if let Some(region_manager) = &self.region_manager {
1115 region_manager
1116 .get_region_health(region_id)
1117 .await
1118 .map_err(|e| ClusterError::Other(format!("Failed to get region health: {e}")))
1119 } else {
1120 Err(ClusterError::Config(
1121 "Multi-region not configured".to_string(),
1122 ))
1123 }
1124 }
1125
1126 pub async fn perform_region_failover(
1128 &self,
1129 failed_region: &str,
1130 target_region: &str,
1131 ) -> Result<()> {
1132 if let Some(region_manager) = &self.region_manager {
1133 region_manager
1134 .perform_region_failover(failed_region, target_region)
1135 .await
1136 .map_err(|e| ClusterError::Other(format!("Failed to perform region failover: {e}")))
1137 } else {
1138 Err(ClusterError::Config(
1139 "Multi-region not configured".to_string(),
1140 ))
1141 }
1142 }
1143
1144 pub async fn get_region_topology(&self) -> Result<region_manager::RegionTopology> {
1146 if let Some(region_manager) = &self.region_manager {
1147 Ok(region_manager.get_topology().await)
1148 } else {
1149 Err(ClusterError::Config(
1150 "Multi-region not configured".to_string(),
1151 ))
1152 }
1153 }
1154
1155 pub async fn add_node_to_region(
1157 &self,
1158 node_id: OxirsNodeId,
1159 region_id: String,
1160 availability_zone_id: String,
1161 data_center: Option<String>,
1162 rack: Option<String>,
1163 ) -> Result<()> {
1164 if let Some(region_manager) = &self.region_manager {
1165 region_manager
1166 .register_node(node_id, region_id, availability_zone_id, data_center, rack)
1167 .await
1168 .map_err(|e| ClusterError::Other(format!("Failed to add node to region: {e}")))
1169 } else {
1170 Err(ClusterError::Config(
1171 "Multi-region not configured".to_string(),
1172 ))
1173 }
1174 }
1175
1176 pub fn conflict_resolver(&self) -> &Arc<ConflictResolver> {
1178 &self.conflict_resolver
1179 }
1180
1181 pub async fn get_vector_clock(&self) -> VectorClock {
1183 self.local_vector_clock.read().await.clone()
1184 }
1185
1186 pub async fn update_vector_clock(&self, received_clock: &VectorClock) {
1188 let mut clock = self.local_vector_clock.write().await;
1189 clock.update(received_clock);
1190 clock.increment(self.config.node_id);
1191 }
1192
1193 pub async fn create_timestamped_operation(
1195 &self,
1196 operation: conflict_resolution::RdfOperation,
1197 priority: u32,
1198 ) -> TimestampedOperation {
1199 let mut clock = self.local_vector_clock.write().await;
1200 clock.increment(self.config.node_id);
1201
1202 TimestampedOperation {
1203 operation_id: uuid::Uuid::new_v4().to_string(),
1204 origin_node: self.config.node_id,
1205 vector_clock: clock.clone(),
1206 physical_time: std::time::SystemTime::now(),
1207 operation,
1208 priority,
1209 }
1210 }
1211
1212 pub async fn detect_operation_conflicts(
1214 &self,
1215 operations: &[TimestampedOperation],
1216 ) -> Result<Vec<conflict_resolution::ConflictType>> {
1217 self.conflict_resolver
1218 .detect_conflicts(operations)
1219 .await
1220 .map_err(|e| ClusterError::Other(format!("Failed to detect conflicts: {e}")))
1221 }
1222
1223 pub async fn resolve_operation_conflicts(
1225 &self,
1226 conflicts: &[conflict_resolution::ConflictType],
1227 ) -> Result<Vec<conflict_resolution::ResolutionResult>> {
1228 self.conflict_resolver
1229 .resolve_conflicts(conflicts)
1230 .await
1231 .map_err(|e| ClusterError::Other(format!("Failed to resolve conflicts: {e}")))
1232 }
1233
1234 pub async fn submit_conflict_aware_operation(
1236 &self,
1237 operation: conflict_resolution::RdfOperation,
1238 priority: u32,
1239 ) -> Result<RdfResponse> {
1240 let _timestamped_op = self
1242 .create_timestamped_operation(operation.clone(), priority)
1243 .await;
1244
1245 match operation {
1248 conflict_resolution::RdfOperation::Insert {
1249 subject,
1250 predicate,
1251 object,
1252 ..
1253 } => self.insert_triple(&subject, &predicate, &object).await,
1254 conflict_resolution::RdfOperation::Delete {
1255 subject,
1256 predicate,
1257 object,
1258 ..
1259 } => self.delete_triple(&subject, &predicate, &object).await,
1260 conflict_resolution::RdfOperation::Clear { .. } => self.clear_store().await,
1261 conflict_resolution::RdfOperation::Update {
1262 old_triple,
1263 new_triple,
1264 ..
1265 } => {
1266 let _delete_result = self
1268 .delete_triple(&old_triple.0, &old_triple.1, &old_triple.2)
1269 .await?;
1270 self.insert_triple(&new_triple.0, &new_triple.1, &new_triple.2)
1271 .await
1272 }
1273 conflict_resolution::RdfOperation::Batch { operations: _ } => {
1274 Ok(RdfResponse::Success)
1279 }
1280 }
1281 }
1282
1283 pub async fn get_conflict_resolution_statistics(
1285 &self,
1286 ) -> conflict_resolution::ResolutionStatistics {
1287 self.conflict_resolver.get_statistics().await
1288 }
1289}
1290
1291#[derive(Debug, Clone)]
1293pub struct ClusterStatus {
1294 pub node_id: OxirsNodeId,
1296 pub address: SocketAddr,
1298 pub is_leader: bool,
1300 pub current_term: u64,
1302 pub peer_count: usize,
1304 pub triple_count: usize,
1306 pub discovery_stats: discovery::DiscoveryStats,
1308 pub replication_stats: ReplicationStats,
1310 pub is_running: bool,
1312 pub region_status: Option<RegionStatus>,
1314}
1315
1316#[derive(Debug, Clone)]
1318pub struct RegionStatus {
1319 pub region_id: String,
1321 pub availability_zone_id: String,
1323 pub regional_peer_count: usize,
1325 pub total_regions: usize,
1327 pub monitoring_active: bool,
1329}
1330
1331pub struct DistributedStore {
1333 node: ClusterNode,
1334}
1335
1336impl DistributedStore {
1337 pub async fn new(config: NodeConfig) -> Result<Self> {
1339 let node = ClusterNode::new(config).await?;
1340 Ok(Self { node })
1341 }
1342
1343 pub async fn start(&mut self) -> Result<()> {
1345 self.node.start().await
1346 }
1347
1348 pub async fn stop(&mut self) -> Result<()> {
1350 self.node.stop().await
1351 }
1352
1353 pub async fn insert_triple(
1355 &mut self,
1356 subject: &str,
1357 predicate: &str,
1358 object: &str,
1359 ) -> Result<()> {
1360 let _response = self.node.insert_triple(subject, predicate, object).await?;
1361 Ok(())
1362 }
1363
1364 pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
1366 self.node.query_sparql(sparql).await
1367 }
1368
1369 pub async fn query_pattern(
1371 &self,
1372 subject: Option<&str>,
1373 predicate: Option<&str>,
1374 object: Option<&str>,
1375 ) -> Vec<(String, String, String)> {
1376 self.node.query_triples(subject, predicate, object).await
1377 }
1378
1379 pub async fn get_status(&self) -> ClusterStatus {
1381 self.node.get_status().await
1382 }
1383}
1384
1385pub use consensus::ConsensusError;
1387pub use discovery::DiscoveryError;
1388pub use replication::ReplicationError;
1389
1390#[cfg(test)]
1391mod tests {
1392 use super::*;
1393 use std::net::{IpAddr, Ipv4Addr};
1394
1395 #[tokio::test]
1396 async fn test_node_config_creation() {
1397 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1398 let config = NodeConfig::new(1, addr);
1399
1400 assert_eq!(config.node_id, 1);
1401 assert_eq!(config.address, addr);
1402 assert_eq!(config.data_dir, "./data/node-1");
1403 assert!(config.peers.is_empty());
1404 assert!(config.discovery.is_some());
1405 assert!(config.replication_strategy.is_some());
1406 assert!(config.region_config.is_none());
1407 }
1408
1409 #[tokio::test]
1410 async fn test_node_config_add_peer() {
1411 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1412 let mut config = NodeConfig::new(1, addr);
1413
1414 config.add_peer(2);
1415 config.add_peer(3);
1416 config.add_peer(2); assert_eq!(config.peers, vec![2, 3]);
1419 }
1420
1421 #[tokio::test]
1422 async fn test_node_config_no_self_peer() {
1423 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1424 let mut config = NodeConfig::new(1, addr);
1425
1426 config.add_peer(1); assert!(config.peers.is_empty());
1429 }
1430
1431 #[tokio::test]
1432 async fn test_cluster_node_creation() {
1433 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1434 let config = NodeConfig::new(1, addr);
1435
1436 let node = ClusterNode::new(config).await;
1437 assert!(node.is_ok());
1438
1439 let node = node.unwrap();
1440 assert_eq!(node.config.node_id, 1);
1441 assert_eq!(node.config.address, addr);
1442 }
1443
1444 #[tokio::test]
1445 async fn test_cluster_node_empty_data_dir_error() {
1446 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1447 let mut config = NodeConfig::new(1, addr);
1448 config.data_dir = String::new();
1449
1450 let result = ClusterNode::new(config).await;
1451 assert!(result.is_err());
1452 if let Err(e) = result {
1453 assert!(e.to_string().contains("Data directory cannot be empty"));
1454 }
1455 }
1456
1457 #[tokio::test]
1458 async fn test_distributed_store_creation() {
1459 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1460 let config = NodeConfig::new(1, addr);
1461
1462 let store = DistributedStore::new(config).await;
1463 assert!(store.is_ok());
1464 }
1465
1466 #[test]
1467 fn test_cluster_error_types() {
1468 let err = ClusterError::Config("test error".to_string());
1469 assert!(err.to_string().contains("Configuration error: test error"));
1470
1471 let err = ClusterError::NotLeader;
1472 assert_eq!(err.to_string(), "Not the leader node");
1473
1474 let err = ClusterError::Network("connection failed".to_string());
1475 assert!(err.to_string().contains("Network error: connection failed"));
1476 }
1477}