1#![allow(dead_code)]
7
8use crate::model::{Triple, TriplePattern};
9use crate::OxirsError;
10use serde::{Deserialize, Serialize};
11use std::collections::{HashMap, VecDeque};
12use std::net::SocketAddr;
13use std::sync::Arc;
14use std::time::{Duration, Instant};
15use tokio::sync::{mpsc, Mutex, RwLock};
16use tokio::time::interval;
17
18#[derive(Debug, Clone)]
20pub struct ReplicationConfig {
21 pub region_id: String,
23 pub region: RegionConfig,
25 pub peers: Vec<RegionPeer>,
27 pub strategy: ReplicationStrategy,
29 pub conflict_resolution: ConflictResolution,
31 pub network: NetworkConfig,
33 pub persistence: PersistenceConfig,
35}
36
37#[derive(Debug, Clone)]
39pub struct RegionConfig {
40 pub name: String,
42 pub location: GeographicLocation,
44 pub availability_zones: Vec<String>,
46 pub capacity: RegionCapacity,
48}
49
50#[derive(Debug, Clone)]
52pub struct GeographicLocation {
53 pub latitude: f64,
55 pub longitude: f64,
57 pub continent: String,
59 pub country: String,
61}
62
63#[derive(Debug, Clone)]
65pub struct RegionCapacity {
66 pub read_units: u32,
68 pub write_units: u32,
70 pub storage_gb: u32,
72 pub auto_scaling: bool,
74}
75
76#[derive(Debug, Clone, Serialize, Deserialize)]
78pub struct RegionPeer {
79 pub region_id: String,
81 pub endpoints: Vec<SocketAddr>,
83 pub priority: u32,
85 pub active: bool,
87}
88
89#[derive(Debug, Clone)]
91pub enum ReplicationStrategy {
92 SyncAll,
94 SyncQuorum { n: usize },
96 AsyncAll,
98 Chain { order: Vec<String> },
100 Hierarchical { topology: ReplicationTopology },
102 Adaptive,
104}
105
106#[derive(Debug, Clone)]
108pub struct ReplicationTopology {
109 pub primary: Vec<String>,
111 pub secondary: Vec<String>,
113 pub edge: Vec<String>,
115}
116
117#[derive(Debug, Clone)]
119pub enum ConflictResolution {
120 LastWriteWins,
122 VectorClock,
124 Custom(String),
126 MultiValue,
128 RegionPriority,
130}
131
132#[derive(Debug, Clone)]
134pub struct NetworkConfig {
135 pub connect_timeout: Duration,
137 pub request_timeout: Duration,
139 pub max_retries: u32,
141 pub compression: bool,
143 pub encryption: EncryptionConfig,
145}
146
147#[derive(Debug, Clone)]
149pub struct EncryptionConfig {
150 pub tls_enabled: bool,
152 pub cert_path: Option<String>,
154 pub key_path: Option<String>,
156 pub ca_path: Option<String>,
158}
159
160#[derive(Debug, Clone)]
162pub struct PersistenceConfig {
163 pub wal_path: String,
165 pub checkpoint_interval: Duration,
167 pub max_wal_size: usize,
169 pub wal_compression: bool,
171}
172
173#[derive(Debug, Clone, Serialize, Deserialize)]
175pub enum ReplicationOp {
176 Insert(VersionedTriple),
178 Delete(VersionedTriple),
180 Batch(Vec<ReplicationOp>),
182 SnapshotChunk(SnapshotChunk),
184 Heartbeat(HeartbeatInfo),
186}
187
188#[derive(Debug, Clone, Serialize, Deserialize)]
190pub struct VersionedTriple {
191 pub triple: Triple,
193 pub version: VectorClock,
195 pub timestamp: u64,
197 pub origin_region: String,
199 pub tx_id: Option<String>,
201}
202
203#[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
205pub struct VectorClock {
206 pub entries: HashMap<String, u64>,
208}
209
210impl Default for VectorClock {
211 fn default() -> Self {
212 Self::new()
213 }
214}
215
216impl VectorClock {
217 pub fn new() -> Self {
219 VectorClock {
220 entries: HashMap::new(),
221 }
222 }
223
224 pub fn increment(&mut self, region: &str) {
226 let counter = self.entries.entry(region.to_string()).or_insert(0);
227 *counter += 1;
228 }
229
230 pub fn merge(&mut self, other: &VectorClock) {
232 for (region, &count) in &other.entries {
233 let entry = self.entries.entry(region.clone()).or_insert(0);
234 *entry = (*entry).max(count);
235 }
236 }
237
238 pub fn is_concurrent(&self, other: &VectorClock) -> bool {
240 !self.happens_before(other) && !other.happens_before(self)
241 }
242
243 pub fn happens_before(&self, other: &VectorClock) -> bool {
245 let mut all_leq = true;
246 let mut exists_lt = false;
247
248 for (region, &count) in &self.entries {
249 let other_count = other.entries.get(region).copied().unwrap_or(0);
250 if count > other_count {
251 all_leq = false;
252 break;
253 }
254 if count < other_count {
255 exists_lt = true;
256 }
257 }
258
259 for region in other.entries.keys() {
261 if !self.entries.contains_key(region) && other.entries[region] > 0 {
262 exists_lt = true;
263 }
264 }
265
266 all_leq && exists_lt
267 }
268}
269
270#[derive(Debug, Clone, Serialize, Deserialize)]
272pub struct SnapshotChunk {
273 pub snapshot_id: String,
275 pub chunk_index: u64,
277 pub total_chunks: u64,
279 pub data: Vec<u8>,
281 pub checksum: String,
283}
284
285#[derive(Debug, Clone, Serialize, Deserialize)]
287pub struct HeartbeatInfo {
288 pub region_id: String,
290 pub timestamp: u64,
292 pub load: LoadMetrics,
294 pub lag_ms: u64,
296}
297
298#[derive(Debug, Clone, Serialize, Deserialize)]
300pub struct LoadMetrics {
301 pub cpu_percent: f32,
303 pub memory_percent: f32,
305 pub disk_percent: f32,
307 pub network_mbps: f32,
309 pub connections: u32,
311}
312
313pub struct ReplicationManager {
315 config: ReplicationConfig,
317 storage: Arc<RwLock<ReplicationStorage>>,
319 state: Arc<RwLock<ReplicationState>>,
321 network: Arc<NetworkManager>,
323 resolver: Arc<ConflictResolver>,
325 wal: Arc<RwLock<WriteAheadLog>>,
327 stats: Arc<RwLock<ReplicationStats>>,
329}
330
331#[allow(dead_code)]
333struct ReplicationStorage {
334 triples: HashMap<Triple, VersionedTriple>,
336 conflicts: HashMap<Triple, Vec<VersionedTriple>>,
338 #[allow(dead_code)]
340 pending_ops: VecDeque<ReplicationOp>,
341}
342
343#[allow(dead_code)]
345struct ReplicationState {
346 vector_clock: VectorClock,
348 peer_states: HashMap<String, PeerState>,
350 active_snapshots: HashMap<String, SnapshotTransfer>,
352 status: ReplicationStatus,
354}
355
356#[derive(Debug, Clone)]
358#[allow(dead_code)]
359struct PeerState {
360 last_seen: Instant,
362 last_clock: VectorClock,
364 connected: bool,
366 lag_ms: u64,
368 in_flight: u64,
370}
371
372#[allow(dead_code)]
374struct SnapshotTransfer {
375 id: String,
377 direction: TransferDirection,
379 chunks_transferred: u64,
381 total_chunks: u64,
383 start_time: Instant,
385}
386
387#[derive(Debug)]
389enum TransferDirection {
390 Send,
391 Receive,
392}
393
394#[derive(Debug, Clone)]
396enum ReplicationStatus {
397 Healthy,
398 Degraded,
399 PartialOutage,
400 FullOutage,
401}
402
403struct NetworkManager {
405 connections: Arc<RwLock<HashMap<String, PeerConnection>>>,
407 message_tx: mpsc::Sender<NetworkMessage>,
409 message_rx: Arc<Mutex<mpsc::Receiver<NetworkMessage>>>,
410}
411
412struct PeerConnection {
414 region_id: String,
416 handle: Arc<Mutex<Option<tokio::task::JoinHandle<()>>>>,
418 send_tx: mpsc::Sender<ReplicationOp>,
420}
421
422#[derive(Debug)]
424enum NetworkMessage {
425 IncomingOp {
427 _from_region: String,
428 _op: Box<ReplicationOp>,
429 },
430 ConnectionEvent {
432 region_id: String,
433 event: ConnectionEvent,
434 },
435}
436
437#[derive(Debug)]
439enum ConnectionEvent {
440 Connected,
441 Disconnected,
442 Error(String),
443}
444
445struct ConflictResolver {
447 strategy: ConflictResolution,
449 region_priorities: HashMap<String, u32>,
451}
452
453struct WriteAheadLog {
455 current_file: Option<std::fs::File>,
457 entries: VecDeque<WalEntry>,
459 current_size: usize,
461 config: PersistenceConfig,
463}
464
465#[derive(Debug, Serialize, Deserialize)]
467struct WalEntry {
468 seq: u64,
470 timestamp: u64,
472 op: ReplicationOp,
474 checksum: u32,
476}
477
478#[derive(Debug, Default)]
480struct ReplicationStats {
481 ops_sent: u64,
483 ops_received: u64,
485 conflicts_detected: u64,
487 conflicts_resolved: u64,
489 bytes_sent: u64,
491 bytes_received: u64,
493 avg_lag_ms: f64,
495}
496
497impl ReplicationManager {
498 pub async fn new(config: ReplicationConfig) -> Result<Self, OxirsError> {
500 let (message_tx, message_rx) = mpsc::channel(10000);
501
502 let storage = Arc::new(RwLock::new(ReplicationStorage {
503 triples: HashMap::new(),
504 conflicts: HashMap::new(),
505 pending_ops: VecDeque::new(),
506 }));
507
508 let state = Arc::new(RwLock::new(ReplicationState {
509 vector_clock: VectorClock::new(),
510 peer_states: HashMap::new(),
511 active_snapshots: HashMap::new(),
512 status: ReplicationStatus::Healthy,
513 }));
514
515 let network = Arc::new(NetworkManager {
516 connections: Arc::new(RwLock::new(HashMap::new())),
517 message_tx,
518 message_rx: Arc::new(Mutex::new(message_rx)),
519 });
520
521 let resolver = Arc::new(ConflictResolver {
522 strategy: config.conflict_resolution.clone(),
523 region_priorities: HashMap::new(),
524 });
525
526 let wal = Arc::new(RwLock::new(WriteAheadLog {
527 current_file: None,
528 entries: VecDeque::new(),
529 current_size: 0,
530 config: config.persistence.clone(),
531 }));
532
533 Ok(ReplicationManager {
534 config,
535 storage,
536 state,
537 network,
538 resolver,
539 wal,
540 stats: Arc::new(RwLock::new(ReplicationStats::default())),
541 })
542 }
543
544 pub async fn start(&self) -> Result<(), OxirsError> {
546 self.initialize_wal().await?;
548
549 self.connect_to_peers().await?;
551
552 self.spawn_message_processor();
554
555 self.spawn_heartbeat_sender();
557
558 self.spawn_lag_monitor();
560
561 self.spawn_wal_checkpoint();
563
564 Ok(())
565 }
566
567 pub async fn replicate_write(&self, triple: Triple, op_type: OpType) -> Result<(), OxirsError> {
569 let mut state = self.state.write().await;
570 state.vector_clock.increment(&self.config.region_id);
571
572 let versioned = VersionedTriple {
573 triple: triple.clone(),
574 version: state.vector_clock.clone(),
575 timestamp: std::time::SystemTime::now()
576 .duration_since(std::time::UNIX_EPOCH)
577 .expect("SystemTime should be after UNIX_EPOCH")
578 .as_secs(),
579 origin_region: self.config.region_id.clone(),
580 tx_id: Some(uuid::Uuid::new_v4().to_string()),
581 };
582
583 let op = match op_type {
584 OpType::Insert => ReplicationOp::Insert(versioned.clone()),
585 OpType::Delete => ReplicationOp::Delete(versioned.clone()),
586 };
587
588 self.write_to_wal(&op).await?;
590
591 let mut storage = self.storage.write().await;
593 match op_type {
594 OpType::Insert => {
595 storage.triples.insert(triple, versioned);
596 }
597 OpType::Delete => {
598 storage.triples.remove(&triple);
599 }
600 }
601
602 match &self.config.strategy {
604 ReplicationStrategy::SyncAll => {
605 self.replicate_sync_all(op).await?;
606 }
607 ReplicationStrategy::AsyncAll => {
608 self.replicate_async_all(op).await?;
609 }
610 ReplicationStrategy::SyncQuorum { n } => {
611 self.replicate_sync_quorum(op, *n).await?;
612 }
613 _ => {
614 self.replicate_async_all(op).await?;
616 }
617 }
618
619 Ok(())
620 }
621
622 pub async fn query(
624 &self,
625 pattern: &TriplePattern,
626 consistency: ConsistencyLevel,
627 ) -> Result<Vec<Triple>, OxirsError> {
628 match consistency {
629 ConsistencyLevel::Strong => {
630 self.sync_with_quorum().await?;
632 }
633 ConsistencyLevel::BoundedStaleness { max_lag_ms } => {
634 let state = self.state.read().await;
636 for peer in state.peer_states.values() {
637 if peer.lag_ms > max_lag_ms {
638 return Err(OxirsError::Store(
639 "Data staleness exceeds bound".to_string(),
640 ));
641 }
642 }
643 }
644 ConsistencyLevel::Eventual => {
645 }
647 }
648
649 let storage = self.storage.read().await;
650 let results: Vec<Triple> = storage
651 .triples
652 .values()
653 .filter(|vt| pattern.matches(&vt.triple))
654 .map(|vt| vt.triple.clone())
655 .collect();
656
657 Ok(results)
658 }
659
660 async fn handle_incoming_op(
662 &self,
663 from_region: String,
664 op: ReplicationOp,
665 ) -> Result<(), OxirsError> {
666 let mut stats = self.stats.write().await;
667 stats.ops_received += 1;
668 drop(stats);
669
670 match &op {
671 ReplicationOp::Insert(versioned) | ReplicationOp::Delete(versioned) => {
672 let mut storage = self.storage.write().await;
673 let mut state = self.state.write().await;
674
675 if let Some(existing) = storage.triples.get(&versioned.triple) {
677 if existing.version.is_concurrent(&versioned.version) {
678 let mut stats = self.stats.write().await;
680 stats.conflicts_detected += 1;
681 drop(stats);
682
683 let winner = self.resolver.resolve_conflict(existing, versioned).await?;
685 storage.triples.insert(versioned.triple.clone(), winner);
686
687 storage
689 .conflicts
690 .entry(versioned.triple.clone())
691 .or_insert_with(Vec::new)
692 .push(versioned.clone());
693 } else if versioned.version.happens_before(&existing.version) {
694 return Ok(());
696 } else {
697 storage
699 .triples
700 .insert(versioned.triple.clone(), versioned.clone());
701 }
702 } else {
703 if matches!(op, ReplicationOp::Insert(_)) {
705 storage
706 .triples
707 .insert(versioned.triple.clone(), versioned.clone());
708 }
709 }
710
711 state.vector_clock.merge(&versioned.version);
713 }
714 ReplicationOp::Batch(ops) => {
715 for op in ops {
716 Box::pin(self.handle_incoming_op(from_region.clone(), op.clone())).await?;
717 }
718 }
719 ReplicationOp::Heartbeat(info) => {
720 self.handle_heartbeat(from_region, info.clone()).await?;
721 }
722 _ => {}
723 }
724
725 Ok(())
726 }
727
728 async fn initialize_wal(&self) -> Result<(), OxirsError> {
730 let mut wal = self.wal.write().await;
731 std::fs::create_dir_all(&wal.config.wal_path)?;
732
733 let wal_file = std::fs::OpenOptions::new()
735 .create(true)
736 .append(true)
737 .open(format!("{}/wal.log", wal.config.wal_path))?;
738
739 wal.current_file = Some(wal_file);
740 Ok(())
741 }
742
743 async fn connect_to_peers(&self) -> Result<(), OxirsError> {
745 for peer in &self.config.peers {
746 if peer.active {
747 tracing::info!("Connecting to peer region: {}", peer.region_id);
749
750 let mut state = self.state.write().await;
752 state.peer_states.insert(
753 peer.region_id.clone(),
754 PeerState {
755 last_seen: Instant::now(),
756 last_clock: VectorClock::new(),
757 connected: true,
758 lag_ms: 0,
759 in_flight: 0,
760 },
761 );
762 }
763 }
764 Ok(())
765 }
766
767 fn spawn_message_processor(&self) {
769 let storage = self.storage.clone();
770 let state = self.state.clone();
771 let network = self.network.clone();
772 let resolver = self.resolver.clone();
773 let stats = self.stats.clone();
774
775 tokio::spawn(async move {
776 let mut rx = network.message_rx.lock().await;
777 while let Some(msg) = rx.recv().await {
778 match msg {
779 NetworkMessage::IncomingOp { _from_region, _op } => {
780 let storage = storage.clone();
782 let state = state.clone();
783 let resolver = resolver.clone();
784 let stats = stats.clone();
785
786 tokio::spawn(async move {
787 if let Err(e) = Self::handle_incoming_op_static(
788 _from_region,
789 *_op,
790 storage,
791 state,
792 resolver,
793 stats,
794 )
795 .await
796 {
797 tracing::error!("Error handling incoming op: {}", e);
798 }
799 });
800 }
801 NetworkMessage::ConnectionEvent { region_id, event } => {
802 let mut state_guard = state.write().await;
803 if let Some(peer_state) = state_guard.peer_states.get_mut(®ion_id) {
804 match event {
805 ConnectionEvent::Connected => {
806 peer_state.connected = true;
807 peer_state.last_seen = Instant::now();
808 }
809 ConnectionEvent::Disconnected => {
810 peer_state.connected = false;
811 }
812 ConnectionEvent::Error(err) => {
813 tracing::error!("Connection error for {}: {}", region_id, err);
814 peer_state.connected = false;
815 }
816 }
817 }
818 }
819 }
820 }
821 });
822 }
823
824 async fn handle_incoming_op_static(
826 from_region: String,
827 op: ReplicationOp,
828 storage: Arc<RwLock<ReplicationStorage>>,
829 state: Arc<RwLock<ReplicationState>>,
830 resolver: Arc<ConflictResolver>,
831 stats: Arc<RwLock<ReplicationStats>>,
832 ) -> Result<(), OxirsError> {
833 let mut stats_guard = stats.write().await;
835 stats_guard.ops_received += 1;
836 drop(stats_guard);
837
838 match &op {
839 ReplicationOp::Insert(versioned) | ReplicationOp::Delete(versioned) => {
840 let mut storage_guard = storage.write().await;
841 let mut state_guard = state.write().await;
842
843 if let Some(existing) = storage_guard.triples.get(&versioned.triple) {
845 if existing.version.is_concurrent(&versioned.version) {
846 let mut stats_guard = stats.write().await;
848 stats_guard.conflicts_detected += 1;
849 drop(stats_guard);
850
851 let winner = resolver.resolve_conflict(existing, versioned).await?;
853 storage_guard
854 .triples
855 .insert(versioned.triple.clone(), winner);
856
857 storage_guard
859 .conflicts
860 .entry(versioned.triple.clone())
861 .or_insert_with(Vec::new)
862 .push(versioned.clone());
863 } else if versioned.version.happens_before(&existing.version) {
864 return Ok(());
866 } else {
867 storage_guard
869 .triples
870 .insert(versioned.triple.clone(), versioned.clone());
871 }
872 } else {
873 if matches!(op, ReplicationOp::Insert(_)) {
875 storage_guard
876 .triples
877 .insert(versioned.triple.clone(), versioned.clone());
878 }
879 }
880
881 state_guard.vector_clock.merge(&versioned.version);
883 }
884 ReplicationOp::Batch(ops) => {
885 for op_item in ops {
886 Box::pin(Self::handle_incoming_op_static(
887 from_region.clone(),
888 op_item.clone(),
889 storage.clone(),
890 state.clone(),
891 resolver.clone(),
892 stats.clone(),
893 ))
894 .await?;
895 }
896 }
897 ReplicationOp::Heartbeat(info) => {
898 let mut state_guard = state.write().await;
899 if let Some(peer) = state_guard.peer_states.get_mut(&from_region) {
900 peer.last_seen = std::time::Instant::now();
901 peer.lag_ms = info.lag_ms;
902 }
903 }
904 _ => {}
905 }
906
907 Ok(())
908 }
909
910 fn spawn_heartbeat_sender(&self) {
912 let config = self.config.clone();
913 let _state = self.state.clone();
914 let network = self.network.clone();
915
916 tokio::spawn(async move {
917 let mut interval = interval(Duration::from_secs(5));
918
919 loop {
920 interval.tick().await;
921
922 let heartbeat = ReplicationOp::Heartbeat(HeartbeatInfo {
923 region_id: config.region_id.clone(),
924 timestamp: std::time::SystemTime::now()
925 .duration_since(std::time::UNIX_EPOCH)
926 .expect("SystemTime should be after UNIX_EPOCH")
927 .as_secs(),
928 load: LoadMetrics {
929 cpu_percent: 0.0, memory_percent: 0.0,
931 disk_percent: 0.0,
932 network_mbps: 0.0,
933 connections: 0,
934 },
935 lag_ms: 0,
936 });
937
938 let connections = network.connections.read().await;
940 for (_, conn) in connections.iter() {
941 let _ = conn.send_tx.send(heartbeat.clone()).await;
942 }
943 }
944 });
945 }
946
947 fn spawn_lag_monitor(&self) {
949 let state = self.state.clone();
950 let stats = self.stats.clone();
951
952 tokio::spawn(async move {
953 let mut interval = interval(Duration::from_secs(10));
954
955 loop {
956 interval.tick().await;
957
958 let state_guard = state.read().await;
959 let mut total_lag = 0u64;
960 let mut peer_count = 0u64;
961
962 for peer_state in state_guard.peer_states.values() {
963 total_lag += peer_state.lag_ms;
964 peer_count += 1;
965 }
966
967 if peer_count > 0 {
968 let mut stats_guard = stats.write().await;
969 stats_guard.avg_lag_ms = total_lag as f64 / peer_count as f64;
970 }
971 }
972 });
973 }
974
975 fn spawn_wal_checkpoint(&self) {
977 let wal = self.wal.clone();
978 let config = self.config.persistence.clone();
979
980 tokio::spawn(async move {
981 let mut interval = interval(config.checkpoint_interval);
982
983 loop {
984 interval.tick().await;
985
986 let mut wal_guard = wal.write().await;
988 if wal_guard.current_size > config.max_wal_size {
989 tracing::info!("Rotating WAL file");
991 wal_guard.current_size = 0;
993 }
994 }
995 });
996 }
997
998 async fn write_to_wal(&self, op: &ReplicationOp) -> Result<(), OxirsError> {
1000 let mut wal = self.wal.write().await;
1001
1002 let entry = WalEntry {
1003 seq: wal.entries.len() as u64,
1004 timestamp: std::time::SystemTime::now()
1005 .duration_since(std::time::UNIX_EPOCH)
1006 .expect("SystemTime should be after UNIX_EPOCH")
1007 .as_secs(),
1008 op: op.clone(),
1009 checksum: 0, };
1011
1012 let serialized = oxicode::serde::encode_to_vec(&entry, oxicode::config::standard())?;
1014 if let Some(ref mut file) = wal.current_file {
1015 use std::io::Write;
1016 file.write_all(&serialized)?;
1017 file.sync_all()?;
1018 }
1019
1020 wal.entries.push_back(entry);
1021 wal.current_size += serialized.len();
1022
1023 Ok(())
1024 }
1025
1026 async fn replicate_sync_all(&self, op: ReplicationOp) -> Result<(), OxirsError> {
1028 let connections = self.network.connections.read().await;
1029 let mut futures = Vec::new();
1030
1031 for (_region_id, conn) in connections.iter() {
1032 let tx = conn.send_tx.clone();
1033 let op_clone = op.clone();
1034 let future = async move { tx.send(op_clone).await };
1035 futures.push(future);
1036 }
1037
1038 let results = futures::future::join_all(futures).await;
1040
1041 for result in results {
1043 if result.is_err() {
1044 return Err(OxirsError::Store("Replication failed".to_string()));
1045 }
1046 }
1047
1048 Ok(())
1049 }
1050
1051 async fn replicate_async_all(&self, op: ReplicationOp) -> Result<(), OxirsError> {
1053 let connections = self.network.connections.read().await;
1054
1055 for (_, conn) in connections.iter() {
1056 let _ = conn.send_tx.try_send(op.clone());
1058 }
1059
1060 Ok(())
1061 }
1062
1063 async fn replicate_sync_quorum(&self, op: ReplicationOp, n: usize) -> Result<(), OxirsError> {
1065 let connections = self.network.connections.read().await;
1066
1067 if connections.len() < n - 1 {
1068 return Err(OxirsError::Store(format!(
1069 "Not enough regions for quorum: need {}, have {}",
1070 n,
1071 connections.len() + 1
1072 )));
1073 }
1074
1075 let mut futures = Vec::new();
1076 for (_, conn) in connections.iter().take(n - 1) {
1077 let tx = conn.send_tx.clone();
1078 let op_clone = op.clone();
1079 let future = async move { tx.send(op_clone).await };
1080 futures.push(future);
1081 }
1082
1083 let results = futures::future::join_all(futures).await;
1085
1086 let successes = results.iter().filter(|r| r.is_ok()).count();
1087 if successes + 1 >= n {
1088 Ok(())
1089 } else {
1090 Err(OxirsError::Store("Quorum not achieved".to_string()))
1091 }
1092 }
1093
1094 async fn sync_with_quorum(&self) -> Result<(), OxirsError> {
1096 Ok(())
1098 }
1099
1100 async fn handle_heartbeat(
1102 &self,
1103 from_region: String,
1104 info: HeartbeatInfo,
1105 ) -> Result<(), OxirsError> {
1106 let mut state = self.state.write().await;
1107 if let Some(peer) = state.peer_states.get_mut(&from_region) {
1108 peer.last_seen = Instant::now();
1109 peer.lag_ms = info.lag_ms;
1110 }
1111 Ok(())
1112 }
1113}
1114
1115impl ConflictResolver {
1116 async fn resolve_conflict(
1118 &self,
1119 existing: &VersionedTriple,
1120 incoming: &VersionedTriple,
1121 ) -> Result<VersionedTriple, OxirsError> {
1122 match &self.strategy {
1123 ConflictResolution::LastWriteWins => {
1124 if incoming.timestamp > existing.timestamp {
1125 Ok(incoming.clone())
1126 } else {
1127 Ok(existing.clone())
1128 }
1129 }
1130 ConflictResolution::VectorClock => {
1131 Ok(existing.clone())
1133 }
1134 ConflictResolution::RegionPriority => {
1135 let existing_priority = self
1136 .region_priorities
1137 .get(&existing.origin_region)
1138 .copied()
1139 .unwrap_or(999);
1140 let incoming_priority = self
1141 .region_priorities
1142 .get(&incoming.origin_region)
1143 .copied()
1144 .unwrap_or(999);
1145
1146 if incoming_priority < existing_priority {
1147 Ok(incoming.clone())
1148 } else {
1149 Ok(existing.clone())
1150 }
1151 }
1152 _ => Ok(existing.clone()),
1153 }
1154 }
1155}
1156
1157#[derive(Debug, Clone)]
1159pub enum OpType {
1160 Insert,
1161 Delete,
1162}
1163
1164#[derive(Debug, Clone)]
1166pub enum ConsistencyLevel {
1167 Strong,
1169 BoundedStaleness { max_lag_ms: u64 },
1171 Eventual,
1173}
1174
1175#[cfg(test)]
1176mod tests {
1177 use super::*;
1178 use crate::model::{Literal, NamedNode};
1179
1180 #[test]
1181 fn test_vector_clock() {
1182 let mut clock1 = VectorClock::new();
1183 let mut clock2 = VectorClock::new();
1184
1185 clock1.increment("region1");
1186 clock1.increment("region1");
1187 clock2.increment("region2");
1188
1189 assert!(!clock1.happens_before(&clock2));
1190 assert!(!clock2.happens_before(&clock1));
1191 assert!(clock1.is_concurrent(&clock2));
1192
1193 clock2.merge(&clock1);
1194 assert!(clock1.happens_before(&clock2));
1195 }
1196
1197 #[tokio::test]
1198 async fn test_replication_manager() {
1199 let config = ReplicationConfig {
1200 region_id: "us-east-1".to_string(),
1201 region: RegionConfig {
1202 name: "US East".to_string(),
1203 location: GeographicLocation {
1204 latitude: 38.7,
1205 longitude: -77.0,
1206 continent: "North America".to_string(),
1207 country: "USA".to_string(),
1208 },
1209 availability_zones: vec!["us-east-1a".to_string(), "us-east-1b".to_string()],
1210 capacity: RegionCapacity {
1211 read_units: 1000,
1212 write_units: 500,
1213 storage_gb: 100,
1214 auto_scaling: true,
1215 },
1216 },
1217 peers: vec![],
1218 strategy: ReplicationStrategy::AsyncAll,
1219 conflict_resolution: ConflictResolution::LastWriteWins,
1220 network: NetworkConfig {
1221 connect_timeout: Duration::from_secs(5),
1222 request_timeout: Duration::from_secs(30),
1223 max_retries: 3,
1224 compression: true,
1225 encryption: EncryptionConfig {
1226 tls_enabled: true,
1227 cert_path: None,
1228 key_path: None,
1229 ca_path: None,
1230 },
1231 },
1232 persistence: PersistenceConfig {
1233 wal_path: "/tmp/wal".to_string(),
1234 checkpoint_interval: Duration::from_secs(300),
1235 max_wal_size: 100 * 1024 * 1024,
1236 wal_compression: true,
1237 },
1238 };
1239
1240 let manager = ReplicationManager::new(config)
1241 .await
1242 .expect("async operation should succeed");
1243
1244 let triple = Triple::new(
1246 NamedNode::new("http://example.org/s").expect("valid IRI"),
1247 NamedNode::new("http://example.org/p").expect("valid IRI"),
1248 crate::model::Object::Literal(Literal::new("test")),
1249 );
1250
1251 manager
1252 .replicate_write(triple.clone(), OpType::Insert)
1253 .await
1254 .expect("operation should succeed");
1255
1256 let pattern = TriplePattern::new(None, None, None);
1258 let results = manager
1259 .query(&pattern, ConsistencyLevel::Eventual)
1260 .await
1261 .expect("operation should succeed");
1262 assert_eq!(results.len(), 1);
1263 assert_eq!(results[0], triple);
1264 }
1265}