1use std::collections::{HashMap, HashSet, VecDeque};
8use std::time::{Duration, Instant};
9
10use bytes::Bytes;
11use shadow_core::{PeerId, PeerInfo};
12
13use crate::node::DHTNode;
14
15#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
19pub enum ReplicationPriority {
20 Low = 0,
22 Normal = 1,
24 High = 2,
26}
27
28#[derive(Debug, Clone)]
32pub struct ReplicationTask {
33 pub key: [u8; 32],
35 pub value: Bytes,
37 pub target_peer: PeerId,
39 pub priority: ReplicationPriority,
41 pub created_at: Instant,
43 pub deadline: Option<Instant>,
45}
46
47#[derive(Debug, Clone)]
50struct ReplicaInfo {
51 holders: HashSet<PeerId>,
53 stored_at: Instant,
55 last_refresh: Instant,
57 value_size: usize,
59}
60
61#[derive(Debug, Clone, Default)]
65pub struct ReplicationStats {
66 pub total_keys: usize,
67 pub fully_replicated: usize,
68 pub under_replicated: usize,
69 pub pending_tasks: usize,
70 pub total_replicas: usize,
71}
72
73#[derive(Debug, Clone)]
77pub struct ReplicationConfig {
78 pub factor: usize,
80 pub refresh_interval: Duration,
82 pub ttl: Duration,
84 pub max_pending: usize,
86}
87
88impl Default for ReplicationConfig {
89 fn default() -> Self {
90 Self {
91 factor: 3,
92 refresh_interval: Duration::from_secs(3600),
93 ttl: Duration::from_secs(86400),
94 max_pending: 1000,
95 }
96 }
97}
98
99pub struct ReplicationManager {
106 config: ReplicationConfig,
107 replicas: HashMap<[u8; 32], ReplicaInfo>,
109 pending: VecDeque<ReplicationTask>,
111}
112
113impl ReplicationManager {
114 pub fn new(config: ReplicationConfig) -> Self {
116 Self {
117 config,
118 replicas: HashMap::new(),
119 pending: VecDeque::new(),
120 }
121 }
122
123 pub fn with_factor(factor: usize) -> Self {
125 Self::new(ReplicationConfig {
126 factor,
127 ..Default::default()
128 })
129 }
130
131 pub fn schedule_replication(
137 &mut self,
138 key: [u8; 32],
139 value: Bytes,
140 closest_peers: &[PeerInfo],
141 ) -> Vec<ReplicationTask> {
142 let now = Instant::now();
143
144 let info = self.replicas.entry(key).or_insert_with(|| ReplicaInfo {
146 holders: HashSet::new(),
147 stored_at: now,
148 last_refresh: now,
149 value_size: value.len(),
150 });
151 info.last_refresh = now;
152 info.value_size = value.len();
153
154 let current = info.holders.len();
156 let needed = self.config.factor.saturating_sub(current);
157
158 let mut tasks = Vec::new();
159
160 for peer in closest_peers.iter().take(needed + current) {
162 if !info.holders.contains(&peer.id) && tasks.len() < needed {
163 let task = ReplicationTask {
164 key,
165 value: value.clone(),
166 target_peer: peer.id,
167 priority: ReplicationPriority::High,
168 created_at: now,
169 deadline: Some(now + Duration::from_secs(60)),
170 };
171 tasks.push(task.clone());
172 self.pending.push_back(task);
173 }
174 }
175
176 tasks
177 }
178
179 pub fn confirm_replication(&mut self, key: [u8; 32], peer: PeerId) {
181 if let Some(info) = self.replicas.get_mut(&key) {
182 info.holders.insert(peer);
183 }
184 }
185
186 pub fn take_pending_tasks(&mut self) -> Vec<ReplicationTask> {
188 self.pending.drain(..).collect()
189 }
190
191 pub fn pending_count(&self) -> usize {
193 self.pending.len()
194 }
195
196 pub fn check_refresh(&mut self, dht: &DHTNode) -> Vec<ReplicationTask> {
204 let now = Instant::now();
205 let mut tasks = Vec::new();
206
207 let stale_keys: Vec<[u8; 32]> = self
208 .replicas
209 .iter()
210 .filter(|(_, info)| now.duration_since(info.last_refresh) >= self.config.refresh_interval)
211 .map(|(k, _)| *k)
212 .collect();
213
214 for key in stale_keys {
215 if let Some(stored) = dht.get_value(&key) {
216 let key_peer = PeerId::from_bytes(key);
217 let closest = dht.find_closest_peers(&key_peer, self.config.factor);
218 let new_tasks = self.schedule_replication(key, stored.data.clone(), &closest);
219 tasks.extend(new_tasks);
220 }
221 }
222
223 tasks
224 }
225
226 pub fn handle_peer_lost(&mut self, lost_peer: &PeerId, dht: &DHTNode) -> Vec<ReplicationTask> {
228 let now = Instant::now();
229 let mut tasks = Vec::new();
230
231 let affected_keys: Vec<[u8; 32]> = self
233 .replicas
234 .iter()
235 .filter(|(_, info)| info.holders.contains(lost_peer))
236 .map(|(k, _)| *k)
237 .collect();
238
239 for key in affected_keys {
240 if let Some(info) = self.replicas.get_mut(&key) {
242 info.holders.remove(lost_peer);
243 }
244
245 if let Some(stored) = dht.get_value(&key) {
247 let key_peer = PeerId::from_bytes(key);
248 let closest = dht.find_closest_peers(&key_peer, self.config.factor * 2);
249
250 if let Some(info) = self.replicas.get(&key) {
251 let current = info.holders.len();
252 let needed = self.config.factor.saturating_sub(current);
253
254 for peer in closest.iter() {
255 if !info.holders.contains(&peer.id) && tasks.len() < needed {
256 let task = ReplicationTask {
257 key,
258 value: stored.data.clone(),
259 target_peer: peer.id,
260 priority: ReplicationPriority::High,
261 created_at: now,
262 deadline: Some(now + Duration::from_secs(30)),
263 };
264 tasks.push(task.clone());
265 self.pending.push_back(task);
266 }
267 }
268 }
269 }
270 }
271
272 tasks
273 }
274
275 pub fn cleanup_expired(&mut self) -> usize {
277 let now = Instant::now();
278 let before = self.replicas.len();
279 self.replicas.retain(|_, info| {
280 now.duration_since(info.stored_at) < self.config.ttl
281 });
282 before - self.replicas.len()
283 }
284
285 pub fn replica_count(&self, key: &[u8; 32]) -> usize {
289 self.replicas.get(key).map(|i| i.holders.len()).unwrap_or(0)
290 }
291
292 pub fn is_fully_replicated(&self, key: &[u8; 32]) -> bool {
294 self.replica_count(key) >= self.config.factor
295 }
296
297 pub fn under_replicated_keys(&self) -> Vec<[u8; 32]> {
299 self.replicas
300 .iter()
301 .filter(|(_, info)| info.holders.len() < self.config.factor)
302 .map(|(k, _)| *k)
303 .collect()
304 }
305
306 pub fn holders(&self, key: &[u8; 32]) -> HashSet<PeerId> {
308 self.replicas
309 .get(key)
310 .map(|i| i.holders.clone())
311 .unwrap_or_default()
312 }
313
314 pub fn stats(&self) -> ReplicationStats {
316 let total_keys = self.replicas.len();
317 let fully_replicated = self
318 .replicas
319 .values()
320 .filter(|i| i.holders.len() >= self.config.factor)
321 .count();
322 let total_replicas: usize = self.replicas.values().map(|i| i.holders.len()).sum();
323
324 ReplicationStats {
325 total_keys,
326 fully_replicated,
327 under_replicated: total_keys - fully_replicated,
328 pending_tasks: self.pending.len(),
329 total_replicas,
330 }
331 }
332
333 pub fn config(&self) -> &ReplicationConfig {
335 &self.config
336 }
337}
338
339#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::NodeConfig;
345
346 fn make_peer(id_byte: u8) -> PeerInfo {
347 PeerInfo::new(
348 PeerId::from_bytes([id_byte; 32]),
349 vec![format!("/ip4/127.0.0.1/tcp/{}", 9000 + id_byte as u16)],
350 [id_byte; 32],
351 [id_byte; 32],
352 )
353 }
354
355 #[test]
356 fn test_schedule_replication() {
357 let mut mgr = ReplicationManager::with_factor(3);
358 let key = [1u8; 32];
359 let value = Bytes::from_static(b"hello world");
360 let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
361
362 let tasks = mgr.schedule_replication(key, value, &peers);
363 assert_eq!(tasks.len(), 3);
364 assert!(tasks.iter().all(|t| t.priority == ReplicationPriority::High));
365 }
366
367 #[test]
368 fn test_confirm_replication() {
369 let mut mgr = ReplicationManager::with_factor(3);
370 let key = [2u8; 32];
371 let value = Bytes::from_static(b"data");
372 let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
373
374 let tasks = mgr.schedule_replication(key, value, &peers);
375 assert_eq!(mgr.replica_count(&key), 0);
376
377 for task in &tasks {
378 mgr.confirm_replication(key, task.target_peer);
379 }
380 assert_eq!(mgr.replica_count(&key), 3);
381 assert!(mgr.is_fully_replicated(&key));
382 }
383
384 #[test]
385 fn test_handle_peer_lost() {
386 let mut mgr = ReplicationManager::with_factor(3);
387 let local_id = PeerId::random();
388 let dht = DHTNode::new(local_id, NodeConfig::default());
389 let key = [3u8; 32];
390 let value = Bytes::from_static(b"important data");
391 let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
392
393 for p in &peers {
395 let _ = dht.add_peer(p.clone());
396 }
397
398 dht.store_value(key, value.clone(), [0u8; 32]).unwrap();
400
401 let tasks = mgr.schedule_replication(key, value, &peers);
403 for t in &tasks {
404 mgr.confirm_replication(key, t.target_peer);
405 }
406 assert_eq!(mgr.replica_count(&key), 3);
407
408 let lost = tasks[0].target_peer;
410 let re_tasks = mgr.handle_peer_lost(&lost, &dht);
411 assert!(!re_tasks.is_empty());
413 assert_eq!(mgr.replica_count(&key), 2); }
415
416 #[test]
417 fn test_under_replicated_keys() {
418 let mut mgr = ReplicationManager::with_factor(3);
419 let key = [4u8; 32];
420 let value = Bytes::from_static(b"partial");
421 let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
422
423 mgr.schedule_replication(key, value, &peers);
424 let under = mgr.under_replicated_keys();
426 assert!(under.contains(&key));
427 }
428
429 #[test]
430 fn test_stats() {
431 let mut mgr = ReplicationManager::with_factor(2);
432 let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
433
434 let key_a = [10u8; 32];
436 let tasks_a = mgr.schedule_replication(key_a, Bytes::from_static(b"aaa"), &peers);
437 for t in &tasks_a {
438 mgr.confirm_replication(key_a, t.target_peer);
439 }
440
441 let key_b = [11u8; 32];
443 let tasks_b = mgr.schedule_replication(key_b, Bytes::from_static(b"bbb"), &peers);
444 mgr.confirm_replication(key_b, tasks_b[0].target_peer);
445
446 let s = mgr.stats();
447 assert_eq!(s.total_keys, 2);
448 assert_eq!(s.fully_replicated, 1);
449 assert_eq!(s.under_replicated, 1);
450 assert_eq!(s.total_replicas, 3); }
452
453 #[test]
454 fn test_take_pending_tasks() {
455 let mut mgr = ReplicationManager::with_factor(2);
456 let peers: Vec<PeerInfo> = (1..=3).map(make_peer).collect();
457
458 mgr.schedule_replication([20u8; 32], Bytes::from_static(b"x"), &peers);
459 assert!(mgr.pending_count() > 0);
460
461 let taken = mgr.take_pending_tasks();
462 assert!(!taken.is_empty());
463 assert_eq!(mgr.pending_count(), 0);
464 }
465}