1#![allow(clippy::field_reassign_with_default)]
56#![allow(clippy::single_match)]
57#![allow(clippy::collapsible_if)]
58#![allow(clippy::clone_on_copy)]
59#![allow(clippy::type_complexity)]
60#![allow(clippy::collapsible_match)]
61#![allow(clippy::manual_clamp)]
62#![allow(clippy::needless_range_loop)]
63#![allow(clippy::or_fun_call)]
64#![allow(clippy::if_same_then_else)]
65#![allow(clippy::only_used_in_recursion)]
66#![allow(clippy::new_without_default)]
67#![allow(clippy::derivable_impls)]
68#![allow(clippy::useless_conversion)]
69use serde::{Deserialize, Serialize};
70use std::net::SocketAddr;
71use std::sync::Arc;
72use tokio::sync::RwLock;
73
74pub mod adaptive_leader_election;
75pub mod advanced_partitioning;
76pub mod advanced_storage;
77pub mod alerting;
78pub mod auto_scaling;
79pub mod backup_restore;
80pub mod circuit_breaker;
81pub mod conflict_resolution;
82pub mod consensus;
83pub mod crash_recovery;
84pub mod data_rebalancing;
85pub mod disaster_recovery;
86pub mod discovery;
87pub mod distributed_query;
88pub mod distributed_tracing;
89pub mod edge_computing;
90pub mod enhanced_node_discovery;
91pub mod enhanced_snapshotting;
92pub mod error;
93pub mod failover;
94pub mod federation;
95pub mod health_monitor;
96pub mod health_monitoring;
97pub mod memory_optimization;
98pub mod merkle_tree;
99pub mod mvcc;
100pub mod mvcc_storage;
101pub mod network;
102pub mod node_lifecycle;
103pub mod node_status_tracker;
104pub mod operational_transformation;
105pub mod optimization;
106pub mod partition_detection;
107pub mod performance_metrics;
108pub mod performance_monitor;
109pub mod raft;
110pub mod raft_optimization;
111pub mod raft_profiling;
112pub mod raft_state;
113pub mod range_partitioning;
114pub mod read_replica;
115pub mod region_manager;
116pub mod replication;
117pub mod replication_lag_monitor;
118pub mod rolling_upgrade;
119pub mod visualization_dashboard;
120pub mod zero_downtime_migration;
121pub mod security;
124pub mod serialization;
125pub mod shard;
126pub mod shard_manager;
127pub mod shard_migration;
128pub mod shard_routing;
129pub mod split_brain_prevention;
130pub mod storage;
131pub mod strong_consistency;
132pub mod tls;
133pub mod transaction;
134pub mod transaction_optimizer;
135
136#[cfg(feature = "bft")]
137pub mod bft;
138#[cfg(feature = "bft")]
139pub mod bft_consensus;
140#[cfg(feature = "bft")]
141pub mod bft_network;
142
143pub use error::{ClusterError, Result};
144pub use failover::{FailoverConfig, FailoverManager, FailoverStrategy, RecoveryAction};
145pub use health_monitor::{HealthMonitor, HealthMonitorConfig, NodeHealth, SystemMetrics};
146
147use conflict_resolution::{
158 ConflictResolver, ResolutionStrategy, TimestampedOperation, VectorClock,
159};
160use consensus::ConsensusManager;
161use discovery::{DiscoveryConfig, DiscoveryService, NodeInfo};
162use distributed_query::{DistributedQueryExecutor, ResultBinding};
163use edge_computing::{EdgeComputingManager, EdgeDeploymentStrategy, EdgeDeviceProfile};
164use raft::{OxirsNodeId, RdfResponse};
165use region_manager::{
166 ConsensusStrategy as RegionConsensusStrategy, MultiRegionReplicationStrategy, Region,
167 RegionManager,
168};
169use replication::{ReplicationManager, ReplicationStats, ReplicationStrategy};
170
171#[derive(Debug, Clone, Serialize, Deserialize)]
173pub struct MultiRegionConfig {
174 pub region_id: String,
176 pub availability_zone_id: String,
178 pub data_center: Option<String>,
180 pub rack: Option<String>,
182 pub regions: Vec<Region>,
184 pub consensus_strategy: RegionConsensusStrategy,
186 pub replication_strategy: MultiRegionReplicationStrategy,
188 pub conflict_resolution_strategy: ResolutionStrategy,
190 pub edge_config: Option<EdgeComputingConfig>,
192 pub enable_monitoring: bool,
194}
195
196#[derive(Debug, Clone, Serialize, Deserialize)]
198pub struct EdgeComputingConfig {
199 pub enabled: bool,
201 pub device_profile: EdgeDeviceProfile,
203 pub deployment_strategy: EdgeDeploymentStrategy,
205 pub enable_intelligent_caching: bool,
207 pub enable_network_monitoring: bool,
209}
210
211#[derive(Debug, Clone, Serialize, Deserialize)]
213pub struct NodeConfig {
214 pub node_id: OxirsNodeId,
216 pub address: SocketAddr,
218 pub data_dir: String,
220 pub peers: Vec<OxirsNodeId>,
222 pub discovery: Option<DiscoveryConfig>,
224 pub replication_strategy: Option<ReplicationStrategy>,
226 #[cfg(feature = "bft")]
228 pub use_bft: bool,
229 pub region_config: Option<MultiRegionConfig>,
231}
232
233impl NodeConfig {
234 pub fn new(node_id: OxirsNodeId, address: SocketAddr) -> Self {
236 Self {
237 node_id,
238 address,
239 data_dir: format!("./data/node-{node_id}"),
240 peers: Vec::new(),
241 discovery: Some(DiscoveryConfig::default()),
242 replication_strategy: Some(ReplicationStrategy::default()),
243 #[cfg(feature = "bft")]
244 use_bft: false,
245 region_config: None,
246 }
247 }
248
249 pub fn add_peer(&mut self, peer_id: OxirsNodeId) -> &mut Self {
251 if !self.peers.contains(&peer_id) && peer_id != self.node_id {
252 self.peers.push(peer_id);
253 }
254 self
255 }
256
257 pub fn with_discovery(mut self, discovery: DiscoveryConfig) -> Self {
259 self.discovery = Some(discovery);
260 self
261 }
262
263 pub fn with_replication_strategy(mut self, strategy: ReplicationStrategy) -> Self {
265 self.replication_strategy = Some(strategy);
266 self
267 }
268
269 #[cfg(feature = "bft")]
271 pub fn with_bft(mut self, enable: bool) -> Self {
272 self.use_bft = enable;
273 self
274 }
275
276 pub fn with_multi_region(mut self, region_config: MultiRegionConfig) -> Self {
278 self.region_config = Some(region_config);
279 self
280 }
281
282 pub fn is_multi_region_enabled(&self) -> bool {
284 self.region_config.is_some()
285 }
286
287 pub fn region_id(&self) -> Option<&str> {
289 self.region_config
290 .as_ref()
291 .map(|config| config.region_id.as_str())
292 }
293
294 pub fn availability_zone_id(&self) -> Option<&str> {
296 self.region_config
297 .as_ref()
298 .map(|config| config.availability_zone_id.as_str())
299 }
300}
301
302pub struct ClusterNode {
304 config: NodeConfig,
305 consensus: ConsensusManager,
306 discovery: DiscoveryService,
307 replication: ReplicationManager,
308 query_executor: DistributedQueryExecutor,
309 region_manager: Option<Arc<RegionManager>>,
310 conflict_resolver: Arc<ConflictResolver>,
311 #[allow(dead_code)]
312 edge_manager: Option<Arc<EdgeComputingManager>>,
313 local_vector_clock: Arc<RwLock<VectorClock>>,
314 running: Arc<RwLock<bool>>,
315 byzantine_mode: Arc<RwLock<bool>>,
316 network_isolated: Arc<RwLock<bool>>,
317}
318
319impl ClusterNode {
320 pub async fn new(config: NodeConfig) -> Result<Self> {
322 if config.data_dir.is_empty() {
324 return Err(ClusterError::Config(
325 "Data directory cannot be empty".to_string(),
326 ));
327 }
328
329 tokio::fs::create_dir_all(&config.data_dir)
331 .await
332 .map_err(|e| ClusterError::Other(format!("Failed to create data directory: {e}")))?;
333
334 let consensus = ConsensusManager::new(config.node_id, config.peers.clone());
336
337 let discovery_config = config.discovery.clone().unwrap_or_default();
339 let discovery = DiscoveryService::new(config.node_id, config.address, discovery_config);
340
341 let replication_strategy = config.replication_strategy.clone().unwrap_or_default();
343 let replication = ReplicationManager::new(replication_strategy, config.node_id);
344
345 let query_executor = DistributedQueryExecutor::new(config.node_id);
347
348 let default_resolution_strategy = if let Some(region_config) = &config.region_config {
350 region_config.conflict_resolution_strategy.clone()
351 } else {
352 ResolutionStrategy::LastWriterWins
353 };
354 let conflict_resolver = Arc::new(ConflictResolver::new(default_resolution_strategy));
355
356 let mut vector_clock = VectorClock::new();
358 vector_clock.increment(config.node_id);
359 let local_vector_clock = Arc::new(RwLock::new(vector_clock));
360
361 let region_manager = if let Some(region_config) = &config.region_config {
363 let manager = Arc::new(RegionManager::new(
364 region_config.region_id.clone(),
365 region_config.availability_zone_id.clone(),
366 region_config.consensus_strategy.clone(),
367 region_config.replication_strategy.clone(),
368 ));
369
370 manager
372 .initialize(region_config.regions.clone())
373 .await
374 .map_err(|e| {
375 ClusterError::Other(format!("Failed to initialize region manager: {e}"))
376 })?;
377
378 manager
380 .register_node(
381 config.node_id,
382 region_config.region_id.clone(),
383 region_config.availability_zone_id.clone(),
384 region_config.data_center.clone(),
385 region_config.rack.clone(),
386 )
387 .await
388 .map_err(|e| {
389 ClusterError::Other(format!("Failed to register node in region manager: {e}"))
390 })?;
391
392 Some(manager)
393 } else {
394 None
395 };
396
397 let edge_manager = if let Some(region_config) = &config.region_config {
399 if let Some(edge_config) = ®ion_config.edge_config {
400 if edge_config.enabled {
401 let manager = Arc::new(EdgeComputingManager::new());
402
403 manager
405 .register_device(edge_config.device_profile.clone())
406 .await
407 .map_err(|e| {
408 ClusterError::Other(format!("Failed to register edge device: {e}"))
409 })?;
410
411 Some(manager)
412 } else {
413 None
414 }
415 } else {
416 None
417 }
418 } else {
419 None
420 };
421
422 Ok(Self {
423 config,
424 consensus,
425 discovery,
426 replication,
427 query_executor,
428 region_manager,
429 conflict_resolver,
430 edge_manager,
431 local_vector_clock,
432 running: Arc::new(RwLock::new(false)),
433 byzantine_mode: Arc::new(RwLock::new(false)),
434 network_isolated: Arc::new(RwLock::new(false)),
435 })
436 }
437
438 pub async fn start(&mut self) -> Result<()> {
440 {
441 let mut running = self.running.write().await;
442 if *running {
443 return Ok(());
444 }
445 *running = true;
446 }
447
448 tracing::info!(
449 "Starting cluster node {} at {} with {} peers",
450 self.config.node_id,
451 self.config.address,
452 self.config.peers.len()
453 );
454
455 self.discovery
457 .start()
458 .await
459 .map_err(|e| ClusterError::Other(format!("Failed to start discovery service: {e}")))?;
460
461 let discovered_nodes = self
463 .discovery
464 .discover_nodes()
465 .await
466 .map_err(|e| ClusterError::Other(format!("Failed to discover nodes: {e}")))?;
467
468 for node in discovered_nodes {
470 if node.node_id != self.config.node_id {
471 self.replication
472 .add_replica(node.node_id, node.address.to_string());
473 self.query_executor.add_node(node.node_id).await;
474 }
475 }
476
477 self.consensus
479 .init()
480 .await
481 .map_err(|e| ClusterError::Other(format!("Failed to initialize consensus: {e}")))?;
482
483 tracing::info!("Cluster node {} started successfully", self.config.node_id);
484
485 self.start_background_tasks().await;
487
488 Ok(())
489 }
490
491 pub async fn stop(&mut self) -> Result<()> {
493 let mut running = self.running.write().await;
494 if !*running {
495 return Ok(());
496 }
497
498 tracing::info!("Stopping cluster node {}", self.config.node_id);
499
500 self.discovery
502 .stop()
503 .await
504 .map_err(|e| ClusterError::Other(format!("Failed to stop discovery service: {e}")))?;
505
506 *running = false;
507
508 tracing::info!("Cluster node {} stopped", self.config.node_id);
509
510 Ok(())
511 }
512
513 pub async fn is_leader(&self) -> bool {
515 self.consensus.is_leader().await
516 }
517
518 pub async fn current_term(&self) -> u64 {
520 self.consensus.current_term().await
521 }
522
523 pub async fn insert_triple(
525 &self,
526 subject: &str,
527 predicate: &str,
528 object: &str,
529 ) -> Result<RdfResponse> {
530 if !self.is_leader().await {
531 return Err(ClusterError::NotLeader);
532 }
533
534 let response = self
535 .consensus
536 .insert_triple(
537 subject.to_string(),
538 predicate.to_string(),
539 object.to_string(),
540 )
541 .await?;
542
543 Ok(response)
544 }
545
546 pub async fn delete_triple(
548 &self,
549 subject: &str,
550 predicate: &str,
551 object: &str,
552 ) -> Result<RdfResponse> {
553 if !self.is_leader().await {
554 return Err(ClusterError::NotLeader);
555 }
556
557 let response = self
558 .consensus
559 .delete_triple(
560 subject.to_string(),
561 predicate.to_string(),
562 object.to_string(),
563 )
564 .await?;
565
566 Ok(response)
567 }
568
569 pub async fn clear_store(&self) -> Result<RdfResponse> {
571 if !self.is_leader().await {
572 return Err(ClusterError::NotLeader);
573 }
574
575 let response = self.consensus.clear_store().await?;
576 Ok(response)
577 }
578
579 pub async fn begin_transaction(&self) -> Result<String> {
581 if !self.is_leader().await {
582 return Err(ClusterError::NotLeader);
583 }
584
585 let tx_id = uuid::Uuid::new_v4().to_string();
586 let _response = self.consensus.begin_transaction(tx_id.clone()).await?;
587
588 Ok(tx_id)
589 }
590
591 pub async fn commit_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
593 if !self.is_leader().await {
594 return Err(ClusterError::NotLeader);
595 }
596
597 let response = self.consensus.commit_transaction(tx_id.to_string()).await?;
598 Ok(response)
599 }
600
601 pub async fn rollback_transaction(&self, tx_id: &str) -> Result<RdfResponse> {
603 if !self.is_leader().await {
604 return Err(ClusterError::NotLeader);
605 }
606
607 let response = self
608 .consensus
609 .rollback_transaction(tx_id.to_string())
610 .await?;
611 Ok(response)
612 }
613
614 pub async fn query_triples(
616 &self,
617 subject: Option<&str>,
618 predicate: Option<&str>,
619 object: Option<&str>,
620 ) -> Vec<(String, String, String)> {
621 self.consensus.query(subject, predicate, object).await
622 }
623
624 pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
626 let bindings = self
627 .query_executor
628 .execute_query(sparql)
629 .await
630 .map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))?;
631
632 let results = bindings
634 .into_iter()
635 .map(|binding| {
636 let vars: Vec<String> = binding
637 .variables
638 .into_iter()
639 .map(|(var, val)| format!("{var}: {val}"))
640 .collect();
641 vars.join(", ")
642 })
643 .collect();
644
645 Ok(results)
646 }
647
648 pub async fn query_sparql_bindings(&self, sparql: &str) -> Result<Vec<ResultBinding>> {
650 self.query_executor
651 .execute_query(sparql)
652 .await
653 .map_err(|e| ClusterError::Other(format!("Query execution failed: {e}")))
654 }
655
656 pub async fn get_query_statistics(
658 &self,
659 ) -> Result<std::collections::HashMap<String, distributed_query::QueryStats>> {
660 Ok(self.query_executor.get_statistics().await)
661 }
662
663 pub async fn clear_query_cache(&self) -> Result<()> {
665 self.query_executor.clear_cache().await;
666 Ok(())
667 }
668
669 pub async fn len(&self) -> usize {
671 self.consensus.len().await
672 }
673
674 pub async fn is_empty(&self) -> bool {
676 self.consensus.is_empty().await
677 }
678
679 pub async fn add_cluster_node(
681 &mut self,
682 node_id: OxirsNodeId,
683 address: SocketAddr,
684 ) -> Result<()> {
685 if node_id == self.config.node_id {
686 return Err(ClusterError::Config(
687 "Cannot add self to cluster".to_string(),
688 ));
689 }
690
691 self.config.add_peer(node_id);
693
694 let node_info = NodeInfo::new(node_id, address);
696 self.discovery.add_node(node_info);
697
698 self.replication.add_replica(node_id, address.to_string());
700
701 self.query_executor.add_node(node_id).await;
703
704 self.consensus.add_peer(node_id);
706
707 tracing::info!("Added node {} at {} to cluster", node_id, address);
708
709 Ok(())
710 }
711
712 pub async fn remove_cluster_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
714 if node_id == self.config.node_id {
715 return Err(ClusterError::Config(
716 "Cannot remove self from cluster".to_string(),
717 ));
718 }
719
720 self.config.peers.retain(|&id| id != node_id);
722
723 self.discovery.remove_node(node_id);
725
726 self.replication.remove_replica(node_id);
728
729 self.query_executor.remove_node(node_id).await;
731
732 self.consensus.remove_peer(node_id);
734
735 tracing::info!("Removed node {} from cluster", node_id);
736
737 Ok(())
738 }
739
740 pub async fn get_status(&self) -> ClusterStatus {
742 let consensus_status = self.consensus.get_status().await;
743 let discovery_stats = self.discovery.get_stats().clone();
744 let replication_stats = self.replication.get_stats().clone();
745
746 let region_status = if let Some(region_manager) = &self.region_manager {
748 let region_id = region_manager.get_local_region().to_string();
749 let availability_zone_id = region_manager.get_local_availability_zone().to_string();
750 let regional_peers = region_manager.get_nodes_in_region(®ion_id).await;
751 let topology = region_manager.get_topology().await;
752
753 Some(RegionStatus {
754 region_id,
755 availability_zone_id,
756 regional_peer_count: regional_peers.len(),
757 total_regions: topology.regions.len(),
758 monitoring_active: true, })
760 } else {
761 None
762 };
763
764 ClusterStatus {
765 node_id: self.config.node_id,
766 address: self.config.address,
767 is_leader: consensus_status.is_leader,
768 current_term: consensus_status.current_term,
769 peer_count: consensus_status.peer_count,
770 triple_count: consensus_status.triple_count,
771 discovery_stats,
772 replication_stats,
773 is_running: *self.running.read().await,
774 region_status,
775 }
776 }
777
778 async fn start_background_tasks(&mut self) {
780 let running = Arc::clone(&self.running);
781
782 let discovery_config = self.config.discovery.clone().unwrap_or_default();
784 let mut discovery_clone =
785 DiscoveryService::new(self.config.node_id, self.config.address, discovery_config);
786
787 tokio::spawn(async move {
788 while *running.read().await {
789 discovery_clone.run_periodic_tasks().await;
790 tokio::time::sleep(std::time::Duration::from_secs(1)).await;
791 }
792 });
793
794 let mut replication_clone = ReplicationManager::with_raft_consensus(self.config.node_id);
796 let running_clone = Arc::clone(&self.running);
797
798 tokio::spawn(async move {
799 if *running_clone.read().await {
800 replication_clone.run_maintenance().await; }
802 });
803 }
804
805 pub async fn add_node_with_consensus(
807 &mut self,
808 node_id: OxirsNodeId,
809 address: SocketAddr,
810 ) -> Result<()> {
811 self.consensus
812 .add_node_with_consensus(node_id, address.to_string())
813 .await
814 .map_err(|e| {
815 ClusterError::Other(format!("Failed to add node through consensus: {e}"))
816 })?;
817
818 self.config.add_peer(node_id);
820
821 let node_info = NodeInfo::new(node_id, address);
823 self.discovery.add_node(node_info);
824 self.replication.add_replica(node_id, address.to_string());
825 self.query_executor.add_node(node_id).await;
826
827 Ok(())
828 }
829
830 pub async fn remove_node_with_consensus(&mut self, node_id: OxirsNodeId) -> Result<()> {
832 self.consensus
833 .remove_node_with_consensus(node_id)
834 .await
835 .map_err(|e| {
836 ClusterError::Other(format!("Failed to remove node through consensus: {e}"))
837 })?;
838
839 self.config.peers.retain(|&id| id != node_id);
841
842 self.discovery.remove_node(node_id);
844 self.replication.remove_replica(node_id);
845 self.query_executor.remove_node(node_id).await;
846
847 Ok(())
848 }
849
850 pub async fn graceful_shutdown(&mut self) -> Result<()> {
852 tracing::info!(
853 "Initiating graceful shutdown of cluster node {}",
854 self.config.node_id
855 );
856
857 {
859 let mut running = self.running.write().await;
860 *running = false;
861 }
862
863 self.consensus
865 .graceful_shutdown()
866 .await
867 .map_err(|e| ClusterError::Other(format!("Failed to shutdown consensus: {e}")))?;
868
869 self.discovery
871 .stop()
872 .await
873 .map_err(|e| ClusterError::Other(format!("Failed to stop discovery: {e}")))?;
874
875 tracing::info!("Cluster node {} gracefully shutdown", self.config.node_id);
876 Ok(())
877 }
878
879 pub async fn transfer_leadership(&mut self, target_node: OxirsNodeId) -> Result<()> {
881 if !self.config.peers.contains(&target_node) {
882 return Err(ClusterError::Config(format!(
883 "Target node {target_node} not in cluster"
884 )));
885 }
886
887 self.consensus
888 .transfer_leadership(target_node)
889 .await
890 .map_err(|e| ClusterError::Other(format!("Failed to transfer leadership: {e}")))?;
891
892 Ok(())
893 }
894
895 pub async fn force_evict_node(&mut self, node_id: OxirsNodeId) -> Result<()> {
897 self.consensus
898 .force_evict_node(node_id)
899 .await
900 .map_err(|e| ClusterError::Other(format!("Failed to force evict node: {e}")))?;
901
902 self.config.peers.retain(|&id| id != node_id);
904 self.discovery.remove_node(node_id);
905 self.replication.remove_replica(node_id);
906 self.query_executor.remove_node(node_id).await;
907
908 Ok(())
909 }
910
911 pub async fn check_cluster_health(&self) -> Result<Vec<consensus::NodeHealthStatus>> {
913 self.consensus
914 .check_peer_health()
915 .await
916 .map_err(|e| ClusterError::Other(format!("Failed to check cluster health: {e}")))
917 }
918
919 pub async fn attempt_recovery(&mut self) -> Result<()> {
921 self.consensus
922 .attempt_recovery()
923 .await
924 .map_err(|e| ClusterError::Other(format!("Failed to recover cluster: {e}")))?;
925
926 tracing::info!(
927 "Cluster recovery completed for node {}",
928 self.config.node_id
929 );
930 Ok(())
931 }
932
933 pub fn id(&self) -> OxirsNodeId {
935 self.config.node_id
936 }
937
938 pub async fn count_triples(&self) -> Result<usize> {
940 Ok(self.len().await)
941 }
942
943 pub async fn is_active(&self) -> Result<bool> {
945 Ok(*self.running.read().await && !*self.network_isolated.read().await)
946 }
947
948 pub async fn isolate_network(&self) -> Result<()> {
950 let mut isolated = self.network_isolated.write().await;
951 *isolated = true;
952 tracing::info!("Node {} network isolated", self.config.node_id);
953 Ok(())
954 }
955
956 pub async fn restore_network(&self) -> Result<()> {
958 let mut isolated = self.network_isolated.write().await;
959 *isolated = false;
960 tracing::info!("Node {} network restored", self.config.node_id);
961 Ok(())
962 }
963
964 pub async fn enable_byzantine_mode(&self) -> Result<()> {
966 let mut byzantine = self.byzantine_mode.write().await;
967 *byzantine = true;
968 tracing::info!("Node {} Byzantine mode enabled", self.config.node_id);
969 Ok(())
970 }
971
972 pub async fn is_byzantine(&self) -> Result<bool> {
974 Ok(*self.byzantine_mode.read().await)
975 }
976
977 pub fn region_manager(&self) -> Option<&Arc<RegionManager>> {
979 self.region_manager.as_ref()
980 }
981
982 pub fn is_multi_region_enabled(&self) -> bool {
984 self.region_manager.is_some()
985 }
986
987 pub fn get_region_id(&self) -> Option<String> {
989 self.region_manager
990 .as_ref()
991 .map(|rm| rm.get_local_region().to_string())
992 }
993
994 pub fn get_availability_zone_id(&self) -> Option<String> {
996 self.region_manager
997 .as_ref()
998 .map(|rm| rm.get_local_availability_zone().to_string())
999 }
1000
1001 pub async fn get_regional_peers(&self) -> Result<Vec<OxirsNodeId>> {
1003 if let Some(region_manager) = &self.region_manager {
1004 let region_id = region_manager.get_local_region();
1005 Ok(region_manager.get_nodes_in_region(region_id).await)
1006 } else {
1007 Err(ClusterError::Config(
1008 "Multi-region not configured".to_string(),
1009 ))
1010 }
1011 }
1012
1013 pub async fn get_regional_leader_candidates(&self) -> Result<Vec<OxirsNodeId>> {
1015 if let Some(region_manager) = &self.region_manager {
1016 let region_id = region_manager.get_local_region();
1017 Ok(region_manager.get_leader_candidates(region_id).await)
1018 } else {
1019 Ok(self.config.peers.clone())
1021 }
1022 }
1023
1024 pub async fn get_cross_region_replication_targets(&self) -> Result<Vec<String>> {
1026 if let Some(region_manager) = &self.region_manager {
1027 let region_id = region_manager.get_local_region();
1028 region_manager
1029 .calculate_replication_targets(region_id)
1030 .await
1031 .map_err(|e| {
1032 ClusterError::Other(format!("Failed to calculate replication targets: {e}"))
1033 })
1034 } else {
1035 Ok(Vec::new())
1036 }
1037 }
1038
1039 pub async fn monitor_region_latencies(&self) -> Result<()> {
1041 if let Some(region_manager) = &self.region_manager {
1042 region_manager.monitor_latencies().await.map_err(|e| {
1043 ClusterError::Other(format!("Failed to monitor region latencies: {e}"))
1044 })
1045 } else {
1046 Ok(())
1047 }
1048 }
1049
1050 pub async fn get_region_health(&self, region_id: &str) -> Result<region_manager::RegionHealth> {
1052 if let Some(region_manager) = &self.region_manager {
1053 region_manager
1054 .get_region_health(region_id)
1055 .await
1056 .map_err(|e| ClusterError::Other(format!("Failed to get region health: {e}")))
1057 } else {
1058 Err(ClusterError::Config(
1059 "Multi-region not configured".to_string(),
1060 ))
1061 }
1062 }
1063
1064 pub async fn perform_region_failover(
1066 &self,
1067 failed_region: &str,
1068 target_region: &str,
1069 ) -> Result<()> {
1070 if let Some(region_manager) = &self.region_manager {
1071 region_manager
1072 .perform_region_failover(failed_region, target_region)
1073 .await
1074 .map_err(|e| ClusterError::Other(format!("Failed to perform region failover: {e}")))
1075 } else {
1076 Err(ClusterError::Config(
1077 "Multi-region not configured".to_string(),
1078 ))
1079 }
1080 }
1081
1082 pub async fn get_region_topology(&self) -> Result<region_manager::RegionTopology> {
1084 if let Some(region_manager) = &self.region_manager {
1085 Ok(region_manager.get_topology().await)
1086 } else {
1087 Err(ClusterError::Config(
1088 "Multi-region not configured".to_string(),
1089 ))
1090 }
1091 }
1092
1093 pub async fn add_node_to_region(
1095 &self,
1096 node_id: OxirsNodeId,
1097 region_id: String,
1098 availability_zone_id: String,
1099 data_center: Option<String>,
1100 rack: Option<String>,
1101 ) -> Result<()> {
1102 if let Some(region_manager) = &self.region_manager {
1103 region_manager
1104 .register_node(node_id, region_id, availability_zone_id, data_center, rack)
1105 .await
1106 .map_err(|e| ClusterError::Other(format!("Failed to add node to region: {e}")))
1107 } else {
1108 Err(ClusterError::Config(
1109 "Multi-region not configured".to_string(),
1110 ))
1111 }
1112 }
1113
1114 pub fn conflict_resolver(&self) -> &Arc<ConflictResolver> {
1116 &self.conflict_resolver
1117 }
1118
1119 pub async fn get_vector_clock(&self) -> VectorClock {
1121 self.local_vector_clock.read().await.clone()
1122 }
1123
1124 pub async fn update_vector_clock(&self, received_clock: &VectorClock) {
1126 let mut clock = self.local_vector_clock.write().await;
1127 clock.update(received_clock);
1128 clock.increment(self.config.node_id);
1129 }
1130
1131 pub async fn create_timestamped_operation(
1133 &self,
1134 operation: conflict_resolution::RdfOperation,
1135 priority: u32,
1136 ) -> TimestampedOperation {
1137 let mut clock = self.local_vector_clock.write().await;
1138 clock.increment(self.config.node_id);
1139
1140 TimestampedOperation {
1141 operation_id: uuid::Uuid::new_v4().to_string(),
1142 origin_node: self.config.node_id,
1143 vector_clock: clock.clone(),
1144 physical_time: std::time::SystemTime::now(),
1145 operation,
1146 priority,
1147 }
1148 }
1149
1150 pub async fn detect_operation_conflicts(
1152 &self,
1153 operations: &[TimestampedOperation],
1154 ) -> Result<Vec<conflict_resolution::ConflictType>> {
1155 self.conflict_resolver
1156 .detect_conflicts(operations)
1157 .await
1158 .map_err(|e| ClusterError::Other(format!("Failed to detect conflicts: {e}")))
1159 }
1160
1161 pub async fn resolve_operation_conflicts(
1163 &self,
1164 conflicts: &[conflict_resolution::ConflictType],
1165 ) -> Result<Vec<conflict_resolution::ResolutionResult>> {
1166 self.conflict_resolver
1167 .resolve_conflicts(conflicts)
1168 .await
1169 .map_err(|e| ClusterError::Other(format!("Failed to resolve conflicts: {e}")))
1170 }
1171
1172 pub async fn submit_conflict_aware_operation(
1174 &self,
1175 operation: conflict_resolution::RdfOperation,
1176 priority: u32,
1177 ) -> Result<RdfResponse> {
1178 let _timestamped_op = self
1180 .create_timestamped_operation(operation.clone(), priority)
1181 .await;
1182
1183 match operation {
1186 conflict_resolution::RdfOperation::Insert {
1187 subject,
1188 predicate,
1189 object,
1190 ..
1191 } => self.insert_triple(&subject, &predicate, &object).await,
1192 conflict_resolution::RdfOperation::Delete {
1193 subject,
1194 predicate,
1195 object,
1196 ..
1197 } => self.delete_triple(&subject, &predicate, &object).await,
1198 conflict_resolution::RdfOperation::Clear { .. } => self.clear_store().await,
1199 conflict_resolution::RdfOperation::Update {
1200 old_triple,
1201 new_triple,
1202 ..
1203 } => {
1204 let _delete_result = self
1206 .delete_triple(&old_triple.0, &old_triple.1, &old_triple.2)
1207 .await?;
1208 self.insert_triple(&new_triple.0, &new_triple.1, &new_triple.2)
1209 .await
1210 }
1211 conflict_resolution::RdfOperation::Batch { operations: _ } => {
1212 Ok(RdfResponse::Success)
1217 }
1218 }
1219 }
1220
1221 pub async fn get_conflict_resolution_statistics(
1223 &self,
1224 ) -> conflict_resolution::ResolutionStatistics {
1225 self.conflict_resolver.get_statistics().await
1226 }
1227}
1228
1229#[derive(Debug, Clone)]
1231pub struct ClusterStatus {
1232 pub node_id: OxirsNodeId,
1234 pub address: SocketAddr,
1236 pub is_leader: bool,
1238 pub current_term: u64,
1240 pub peer_count: usize,
1242 pub triple_count: usize,
1244 pub discovery_stats: discovery::DiscoveryStats,
1246 pub replication_stats: ReplicationStats,
1248 pub is_running: bool,
1250 pub region_status: Option<RegionStatus>,
1252}
1253
1254#[derive(Debug, Clone)]
1256pub struct RegionStatus {
1257 pub region_id: String,
1259 pub availability_zone_id: String,
1261 pub regional_peer_count: usize,
1263 pub total_regions: usize,
1265 pub monitoring_active: bool,
1267}
1268
1269pub struct DistributedStore {
1271 node: ClusterNode,
1272}
1273
1274impl DistributedStore {
1275 pub async fn new(config: NodeConfig) -> Result<Self> {
1277 let node = ClusterNode::new(config).await?;
1278 Ok(Self { node })
1279 }
1280
1281 pub async fn start(&mut self) -> Result<()> {
1283 self.node.start().await
1284 }
1285
1286 pub async fn stop(&mut self) -> Result<()> {
1288 self.node.stop().await
1289 }
1290
1291 pub async fn insert_triple(
1293 &mut self,
1294 subject: &str,
1295 predicate: &str,
1296 object: &str,
1297 ) -> Result<()> {
1298 let _response = self.node.insert_triple(subject, predicate, object).await?;
1299 Ok(())
1300 }
1301
1302 pub async fn query_sparql(&self, sparql: &str) -> Result<Vec<String>> {
1304 self.node.query_sparql(sparql).await
1305 }
1306
1307 pub async fn query_pattern(
1309 &self,
1310 subject: Option<&str>,
1311 predicate: Option<&str>,
1312 object: Option<&str>,
1313 ) -> Vec<(String, String, String)> {
1314 self.node.query_triples(subject, predicate, object).await
1315 }
1316
1317 pub async fn get_status(&self) -> ClusterStatus {
1319 self.node.get_status().await
1320 }
1321}
1322
1323pub use consensus::ConsensusError;
1325pub use discovery::DiscoveryError;
1326pub use replication::ReplicationError;
1327
1328#[cfg(test)]
1329mod tests {
1330 use super::*;
1331 use std::net::{IpAddr, Ipv4Addr};
1332
1333 #[tokio::test]
1334 async fn test_node_config_creation() {
1335 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1336 let config = NodeConfig::new(1, addr);
1337
1338 assert_eq!(config.node_id, 1);
1339 assert_eq!(config.address, addr);
1340 assert_eq!(config.data_dir, "./data/node-1");
1341 assert!(config.peers.is_empty());
1342 assert!(config.discovery.is_some());
1343 assert!(config.replication_strategy.is_some());
1344 assert!(config.region_config.is_none());
1345 }
1346
1347 #[tokio::test]
1348 async fn test_node_config_add_peer() {
1349 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1350 let mut config = NodeConfig::new(1, addr);
1351
1352 config.add_peer(2);
1353 config.add_peer(3);
1354 config.add_peer(2); assert_eq!(config.peers, vec![2, 3]);
1357 }
1358
1359 #[tokio::test]
1360 async fn test_node_config_no_self_peer() {
1361 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1362 let mut config = NodeConfig::new(1, addr);
1363
1364 config.add_peer(1); assert!(config.peers.is_empty());
1367 }
1368
1369 #[tokio::test]
1370 async fn test_cluster_node_creation() {
1371 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1372 let config = NodeConfig::new(1, addr);
1373
1374 let node = ClusterNode::new(config).await;
1375 assert!(node.is_ok());
1376
1377 let node = node.unwrap();
1378 assert_eq!(node.config.node_id, 1);
1379 assert_eq!(node.config.address, addr);
1380 }
1381
1382 #[tokio::test]
1383 async fn test_cluster_node_empty_data_dir_error() {
1384 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1385 let mut config = NodeConfig::new(1, addr);
1386 config.data_dir = String::new();
1387
1388 let result = ClusterNode::new(config).await;
1389 assert!(result.is_err());
1390 if let Err(e) = result {
1391 assert!(e.to_string().contains("Data directory cannot be empty"));
1392 }
1393 }
1394
1395 #[tokio::test]
1396 async fn test_distributed_store_creation() {
1397 let addr = SocketAddr::new(IpAddr::V4(Ipv4Addr::new(127, 0, 0, 1)), 8080);
1398 let config = NodeConfig::new(1, addr);
1399
1400 let store = DistributedStore::new(config).await;
1401 assert!(store.is_ok());
1402 }
1403
1404 #[test]
1405 fn test_cluster_error_types() {
1406 let err = ClusterError::Config("test error".to_string());
1407 assert!(err.to_string().contains("Configuration error: test error"));
1408
1409 let err = ClusterError::NotLeader;
1410 assert_eq!(err.to_string(), "Not the leader node");
1411
1412 let err = ClusterError::Network("connection failed".to_string());
1413 assert!(err.to_string().contains("Network error: connection failed"));
1414 }
1415}