ruvector_graph/distributed/
replication.rs1use crate::distributed::shard::{EdgeData, GraphShard, NodeData, NodeId, ShardId};
10use crate::{GraphError, Result};
11use chrono::{DateTime, Utc};
12use dashmap::DashMap;
13use ruvector_replication::{
14 Replica, ReplicaRole, ReplicaSet, ReplicationLog, SyncManager, SyncMode,
15};
16use serde::{Deserialize, Serialize};
17use std::collections::{HashMap, HashSet};
18use std::sync::Arc;
19use tracing::{debug, info, warn};
20use uuid::Uuid;
21
22#[derive(Debug, Clone, Copy, PartialEq, Eq, Serialize, Deserialize)]
24pub enum ReplicationStrategy {
25 FullShard,
27 VertexCut,
29 Subgraph,
31 Hybrid,
33}
34
35#[derive(Debug, Clone, Serialize, Deserialize)]
37pub struct GraphReplicationConfig {
38 pub replication_factor: usize,
40 pub strategy: ReplicationStrategy,
42 pub high_degree_threshold: usize,
44 pub sync_mode: SyncMode,
46 pub enable_conflict_resolution: bool,
48 pub timeout_seconds: u64,
50}
51
52impl Default for GraphReplicationConfig {
53 fn default() -> Self {
54 Self {
55 replication_factor: 3,
56 strategy: ReplicationStrategy::FullShard,
57 high_degree_threshold: 100,
58 sync_mode: SyncMode::Async,
59 enable_conflict_resolution: true,
60 timeout_seconds: 30,
61 }
62 }
63}
64
65pub struct GraphReplication {
67 config: GraphReplicationConfig,
69 replica_sets: Arc<DashMap<ShardId, Arc<ReplicaSet>>>,
71 sync_managers: Arc<DashMap<ShardId, Arc<SyncManager>>>,
73 high_degree_nodes: Arc<DashMap<NodeId, usize>>,
75 node_replicas: Arc<DashMap<NodeId, Vec<String>>>,
77}
78
79impl GraphReplication {
80 pub fn new(config: GraphReplicationConfig) -> Self {
82 Self {
83 config,
84 replica_sets: Arc::new(DashMap::new()),
85 sync_managers: Arc::new(DashMap::new()),
86 high_degree_nodes: Arc::new(DashMap::new()),
87 node_replicas: Arc::new(DashMap::new()),
88 }
89 }
90
91 pub fn initialize_shard_replication(
93 &self,
94 shard_id: ShardId,
95 primary_node: String,
96 replica_nodes: Vec<String>,
97 ) -> Result<()> {
98 info!(
99 "Initializing replication for shard {} with {} replicas",
100 shard_id,
101 replica_nodes.len()
102 );
103
104 let mut replica_set = ReplicaSet::new(format!("shard-{}", shard_id));
106
107 replica_set
109 .add_replica(
110 &primary_node,
111 &format!("{}:9001", primary_node),
112 ReplicaRole::Primary,
113 )
114 .map_err(|e| GraphError::ReplicationError(e))?;
115
116 for (idx, node) in replica_nodes.iter().enumerate() {
118 replica_set
119 .add_replica(
120 &format!("{}-replica-{}", node, idx),
121 &format!("{}:9001", node),
122 ReplicaRole::Secondary,
123 )
124 .map_err(|e| GraphError::ReplicationError(e))?;
125 }
126
127 let replica_set = Arc::new(replica_set);
128
129 let log = Arc::new(ReplicationLog::new(&primary_node));
131
132 let sync_manager = Arc::new(SyncManager::new(Arc::clone(&replica_set), log));
134 sync_manager.set_sync_mode(self.config.sync_mode.clone());
135
136 self.replica_sets.insert(shard_id, replica_set);
137 self.sync_managers.insert(shard_id, sync_manager);
138
139 Ok(())
140 }
141
142 pub async fn replicate_node_add(&self, shard_id: ShardId, node: NodeData) -> Result<()> {
144 debug!(
145 "Replicating node addition: {} to shard {}",
146 node.id, shard_id
147 );
148
149 match self.config.strategy {
151 ReplicationStrategy::FullShard => {
152 self.replicate_to_shard(shard_id, ReplicationOp::AddNode(node))
153 .await
154 }
155 ReplicationStrategy::VertexCut => {
156 let degree = self.get_node_degree(&node.id);
158 if degree >= self.config.high_degree_threshold {
159 self.replicate_high_degree_node(node).await
161 } else {
162 self.replicate_to_shard(shard_id, ReplicationOp::AddNode(node))
163 .await
164 }
165 }
166 ReplicationStrategy::Subgraph | ReplicationStrategy::Hybrid => {
167 self.replicate_to_shard(shard_id, ReplicationOp::AddNode(node))
168 .await
169 }
170 }
171 }
172
173 pub async fn replicate_edge_add(&self, shard_id: ShardId, edge: EdgeData) -> Result<()> {
175 debug!(
176 "Replicating edge addition: {} to shard {}",
177 edge.id, shard_id
178 );
179
180 self.increment_node_degree(&edge.from);
182 self.increment_node_degree(&edge.to);
183
184 self.replicate_to_shard(shard_id, ReplicationOp::AddEdge(edge))
185 .await
186 }
187
188 pub async fn replicate_node_delete(&self, shard_id: ShardId, node_id: NodeId) -> Result<()> {
190 debug!(
191 "Replicating node deletion: {} from shard {}",
192 node_id, shard_id
193 );
194
195 self.replicate_to_shard(shard_id, ReplicationOp::DeleteNode(node_id))
196 .await
197 }
198
199 pub async fn replicate_edge_delete(&self, shard_id: ShardId, edge_id: String) -> Result<()> {
201 debug!(
202 "Replicating edge deletion: {} from shard {}",
203 edge_id, shard_id
204 );
205
206 self.replicate_to_shard(shard_id, ReplicationOp::DeleteEdge(edge_id))
207 .await
208 }
209
210 async fn replicate_to_shard(&self, shard_id: ShardId, op: ReplicationOp) -> Result<()> {
212 let sync_manager = self
213 .sync_managers
214 .get(&shard_id)
215 .ok_or_else(|| GraphError::ShardError(format!("Shard {} not initialized", shard_id)))?;
216
217 let data = bincode::encode_to_vec(&op, bincode::config::standard())
219 .map_err(|e| GraphError::SerializationError(e.to_string()))?;
220
221 debug!("Replicating operation for shard {}", shard_id);
225
226 Ok(())
227 }
228
229 async fn replicate_high_degree_node(&self, node: NodeData) -> Result<()> {
231 info!(
232 "Replicating high-degree node {} to multiple shards",
233 node.id
234 );
235
236 let degree = self.get_node_degree(&node.id);
238 let replica_count =
239 (degree / self.config.high_degree_threshold).min(self.config.replication_factor);
240
241 let mut replica_shards = Vec::new();
242
243 for shard_id in 0..replica_count {
245 replica_shards.push(shard_id as ShardId);
246 }
247
248 for shard_id in replica_shards.clone() {
250 self.replicate_to_shard(shard_id, ReplicationOp::AddNode(node.clone()))
251 .await?;
252 }
253
254 self.node_replicas.insert(
256 node.id.clone(),
257 replica_shards.iter().map(|s| s.to_string()).collect(),
258 );
259
260 Ok(())
261 }
262
263 fn get_node_degree(&self, node_id: &NodeId) -> usize {
265 self.high_degree_nodes
266 .get(node_id)
267 .map(|d| *d.value())
268 .unwrap_or(0)
269 }
270
271 fn increment_node_degree(&self, node_id: &NodeId) {
273 self.high_degree_nodes
274 .entry(node_id.clone())
275 .and_modify(|d| *d += 1)
276 .or_insert(1);
277 }
278
279 pub fn get_replica_set(&self, shard_id: ShardId) -> Option<Arc<ReplicaSet>> {
281 self.replica_sets
282 .get(&shard_id)
283 .map(|r| Arc::clone(r.value()))
284 }
285
286 pub fn get_sync_manager(&self, shard_id: ShardId) -> Option<Arc<SyncManager>> {
288 self.sync_managers
289 .get(&shard_id)
290 .map(|s| Arc::clone(s.value()))
291 }
292
293 pub fn get_stats(&self) -> ReplicationStats {
295 ReplicationStats {
296 total_shards: self.replica_sets.len(),
297 high_degree_nodes: self.high_degree_nodes.len(),
298 replicated_nodes: self.node_replicas.len(),
299 strategy: self.config.strategy,
300 }
301 }
302
303 pub async fn health_check(&self) -> HashMap<ShardId, ReplicaHealth> {
305 let mut health = HashMap::new();
306
307 for entry in self.replica_sets.iter() {
308 let shard_id = *entry.key();
309 let replica_set = entry.value();
310
311 let healthy_count = self.config.replication_factor;
313
314 health.insert(
315 shard_id,
316 ReplicaHealth {
317 total_replicas: self.config.replication_factor,
318 healthy_replicas: healthy_count,
319 is_healthy: healthy_count >= (self.config.replication_factor / 2 + 1),
320 },
321 );
322 }
323
324 health
325 }
326
327 pub fn config(&self) -> &GraphReplicationConfig {
329 &self.config
330 }
331}
332
333#[derive(Debug, Clone, Serialize, Deserialize)]
335enum ReplicationOp {
336 AddNode(NodeData),
337 AddEdge(EdgeData),
338 DeleteNode(NodeId),
339 DeleteEdge(String),
340 UpdateNode(NodeData),
341 UpdateEdge(EdgeData),
342}
343
344#[derive(Debug, Clone, Serialize, Deserialize)]
346pub struct ReplicationStats {
347 pub total_shards: usize,
348 pub high_degree_nodes: usize,
349 pub replicated_nodes: usize,
350 pub strategy: ReplicationStrategy,
351}
352
353#[derive(Debug, Clone, Serialize, Deserialize)]
355pub struct ReplicaHealth {
356 pub total_replicas: usize,
357 pub healthy_replicas: usize,
358 pub is_healthy: bool,
359}
360
361#[cfg(test)]
362mod tests {
363 use super::*;
364 use std::collections::HashMap;
365
366 #[tokio::test]
367 async fn test_graph_replication() {
368 let config = GraphReplicationConfig::default();
369 let replication = GraphReplication::new(config);
370
371 replication
372 .initialize_shard_replication(0, "node-1".to_string(), vec!["node-2".to_string()])
373 .unwrap();
374
375 assert!(replication.get_replica_set(0).is_some());
376 assert!(replication.get_sync_manager(0).is_some());
377 }
378
379 #[tokio::test]
380 async fn test_node_replication() {
381 let config = GraphReplicationConfig::default();
382 let replication = GraphReplication::new(config);
383
384 replication
385 .initialize_shard_replication(0, "node-1".to_string(), vec!["node-2".to_string()])
386 .unwrap();
387
388 let node = NodeData {
389 id: "test-node".to_string(),
390 properties: HashMap::new(),
391 labels: vec!["Test".to_string()],
392 };
393
394 let result = replication.replicate_node_add(0, node).await;
395 assert!(result.is_ok());
396 }
397
398 #[test]
399 fn test_replication_stats() {
400 let config = GraphReplicationConfig::default();
401 let replication = GraphReplication::new(config);
402
403 let stats = replication.get_stats();
404 assert_eq!(stats.total_shards, 0);
405 assert_eq!(stats.strategy, ReplicationStrategy::FullShard);
406 }
407}