Skip to main content

shadow_dht/
replication.rs

1//! DHT data replication manager.
2//!
3//! Tracks which values need replication across the network, schedules
4//! replication tasks, handles periodic refresh, and re-replicates when
5//! peers depart.
6
7use std::collections::{HashMap, HashSet, VecDeque};
8use std::time::{Duration, Instant};
9
10use bytes::Bytes;
11use shadow_core::{PeerId, PeerInfo};
12
13use crate::node::DHTNode;
14
15// ── Replication priority ────────────────────────────────────────────────────
16
17/// Priority level for a replication task
18#[derive(Debug, Clone, Copy, PartialEq, Eq, PartialOrd, Ord)]
19pub enum ReplicationPriority {
20    /// Background maintenance (periodic refresh)
21    Low = 0,
22    /// Standard replication of existing data
23    Normal = 1,
24    /// Urgent — new data or peer loss requiring immediate re-replication
25    High = 2,
26}
27
28// ── Replication task ────────────────────────────────────────────────────────
29
30/// A unit of work describing data that must be sent to a remote peer.
31#[derive(Debug, Clone)]
32pub struct ReplicationTask {
33    /// The key of the value to replicate
34    pub key: [u8; 32],
35    /// The serialised value
36    pub value: Bytes,
37    /// The peer that should receive the replica
38    pub target_peer: PeerId,
39    /// How urgently this should be replicated
40    pub priority: ReplicationPriority,
41    /// When this task was created
42    pub created_at: Instant,
43    /// Optional deadline — if `None`, best-effort
44    pub deadline: Option<Instant>,
45}
46
47// ── Replica tracking ────────────────────────────────────────────────────────
48
49#[derive(Debug, Clone)]
50struct ReplicaInfo {
51    /// Peers known to hold a copy
52    holders: HashSet<PeerId>,
53    /// When the value was first stored locally
54    stored_at: Instant,
55    /// When we last refreshed the replicas
56    last_refresh: Instant,
57    /// Size of the value in bytes
58    value_size: usize,
59}
60
61// ── Stats ───────────────────────────────────────────────────────────────────
62
63/// Summary statistics for the replication sub-system
64#[derive(Debug, Clone, Default)]
65pub struct ReplicationStats {
66    pub total_keys: usize,
67    pub fully_replicated: usize,
68    pub under_replicated: usize,
69    pub pending_tasks: usize,
70    pub total_replicas: usize,
71}
72
73// ── Config ──────────────────────────────────────────────────────────────────
74
75/// Configuration for the replication manager
76#[derive(Debug, Clone)]
77pub struct ReplicationConfig {
78    /// Number of copies to maintain across the network (default: 3)
79    pub factor: usize,
80    /// Interval between refresh sweeps (default: 1 hour)
81    pub refresh_interval: Duration,
82    /// Value time-to-live (default: 24 hours)
83    pub ttl: Duration,
84    /// Maximum pending tasks before back-pressure (default: 1000)
85    pub max_pending: usize,
86}
87
88impl Default for ReplicationConfig {
89    fn default() -> Self {
90        Self {
91            factor: 3,
92            refresh_interval: Duration::from_secs(3600),
93            ttl: Duration::from_secs(86400),
94            max_pending: 1000,
95        }
96    }
97}
98
99// ── ReplicationManager ──────────────────────────────────────────────────────
100
101/// Coordinates data replication across the DHT.
102///
103/// The manager does **not** perform any networking itself — it produces
104/// [`ReplicationTask`]s that a higher-level transport layer executes.
105pub struct ReplicationManager {
106    config: ReplicationConfig,
107    /// key → replica tracking info
108    replicas: HashMap<[u8; 32], ReplicaInfo>,
109    /// Pending replication work
110    pending: VecDeque<ReplicationTask>,
111}
112
113impl ReplicationManager {
114    /// Create a new replication manager with the given configuration.
115    pub fn new(config: ReplicationConfig) -> Self {
116        Self {
117            config,
118            replicas: HashMap::new(),
119            pending: VecDeque::new(),
120        }
121    }
122
123    /// Create with default config but a custom replication factor.
124    pub fn with_factor(factor: usize) -> Self {
125        Self::new(ReplicationConfig {
126            factor,
127            ..Default::default()
128        })
129    }
130
131    // ── Core operations ─────────────────────────────────────────────────
132
133    /// Schedule replication for a newly stored (or updated) value.
134    ///
135    /// Returns the tasks that should be executed by the transport layer.
136    pub fn schedule_replication(
137        &mut self,
138        key: [u8; 32],
139        value: Bytes,
140        closest_peers: &[PeerInfo],
141    ) -> Vec<ReplicationTask> {
142        let now = Instant::now();
143
144        // Insert or update tracking entry
145        let info = self.replicas.entry(key).or_insert_with(|| ReplicaInfo {
146            holders: HashSet::new(),
147            stored_at: now,
148            last_refresh: now,
149            value_size: value.len(),
150        });
151        info.last_refresh = now;
152        info.value_size = value.len();
153
154        // Determine how many additional replicas we need
155        let current = info.holders.len();
156        let needed = self.config.factor.saturating_sub(current);
157
158        let mut tasks = Vec::new();
159
160        // Select peers that don't already hold a copy
161        for peer in closest_peers.iter().take(needed + current) {
162            if !info.holders.contains(&peer.id) && tasks.len() < needed {
163                let task = ReplicationTask {
164                    key,
165                    value: value.clone(),
166                    target_peer: peer.id,
167                    priority: ReplicationPriority::High,
168                    created_at: now,
169                    deadline: Some(now + Duration::from_secs(60)),
170                };
171                tasks.push(task.clone());
172                self.pending.push_back(task);
173            }
174        }
175
176        tasks
177    }
178
179    /// Confirm that a replica was successfully stored on a peer.
180    pub fn confirm_replication(&mut self, key: [u8; 32], peer: PeerId) {
181        if let Some(info) = self.replicas.get_mut(&key) {
182            info.holders.insert(peer);
183        }
184    }
185
186    /// Drain and return all pending tasks.
187    pub fn take_pending_tasks(&mut self) -> Vec<ReplicationTask> {
188        self.pending.drain(..).collect()
189    }
190
191    /// Peek at the number of pending tasks.
192    pub fn pending_count(&self) -> usize {
193        self.pending.len()
194    }
195
196    // ── Maintenance ─────────────────────────────────────────────────────
197
198    /// Scan for keys that need a refresh (their replicas haven't been
199    /// refreshed within `refresh_interval`).
200    ///
201    /// The caller must supply the local DHT node so we can read stored
202    /// values and closest peers.
203    pub fn check_refresh(&mut self, dht: &DHTNode) -> Vec<ReplicationTask> {
204        let now = Instant::now();
205        let mut tasks = Vec::new();
206
207        let stale_keys: Vec<[u8; 32]> = self
208            .replicas
209            .iter()
210            .filter(|(_, info)| now.duration_since(info.last_refresh) >= self.config.refresh_interval)
211            .map(|(k, _)| *k)
212            .collect();
213
214        for key in stale_keys {
215            if let Some(stored) = dht.get_value(&key) {
216                let key_peer = PeerId::from_bytes(key);
217                let closest = dht.find_closest_peers(&key_peer, self.config.factor);
218                let new_tasks = self.schedule_replication(key, stored.data.clone(), &closest);
219                tasks.extend(new_tasks);
220            }
221        }
222
223        tasks
224    }
225
226    /// Handle the loss of a peer — re-replicate any data it held.
227    pub fn handle_peer_lost(&mut self, lost_peer: &PeerId, dht: &DHTNode) -> Vec<ReplicationTask> {
228        let now = Instant::now();
229        let mut tasks = Vec::new();
230
231        // Find all keys where the lost peer was a replica holder
232        let affected_keys: Vec<[u8; 32]> = self
233            .replicas
234            .iter()
235            .filter(|(_, info)| info.holders.contains(lost_peer))
236            .map(|(k, _)| *k)
237            .collect();
238
239        for key in affected_keys {
240            // Remove the lost peer from holders
241            if let Some(info) = self.replicas.get_mut(&key) {
242                info.holders.remove(lost_peer);
243            }
244
245            // Re-replicate if under-replicated
246            if let Some(stored) = dht.get_value(&key) {
247                let key_peer = PeerId::from_bytes(key);
248                let closest = dht.find_closest_peers(&key_peer, self.config.factor * 2);
249
250                if let Some(info) = self.replicas.get(&key) {
251                    let current = info.holders.len();
252                    let needed = self.config.factor.saturating_sub(current);
253
254                    for peer in closest.iter() {
255                        if !info.holders.contains(&peer.id) && tasks.len() < needed {
256                            let task = ReplicationTask {
257                                key,
258                                value: stored.data.clone(),
259                                target_peer: peer.id,
260                                priority: ReplicationPriority::High,
261                                created_at: now,
262                                deadline: Some(now + Duration::from_secs(30)),
263                            };
264                            tasks.push(task.clone());
265                            self.pending.push_back(task);
266                        }
267                    }
268                }
269            }
270        }
271
272        tasks
273    }
274
275    /// Remove tracking for expired keys.
276    pub fn cleanup_expired(&mut self) -> usize {
277        let now = Instant::now();
278        let before = self.replicas.len();
279        self.replicas.retain(|_, info| {
280            now.duration_since(info.stored_at) < self.config.ttl
281        });
282        before - self.replicas.len()
283    }
284
285    // ── Queries ─────────────────────────────────────────────────────────
286
287    /// How many confirmed replicas exist for a key.
288    pub fn replica_count(&self, key: &[u8; 32]) -> usize {
289        self.replicas.get(key).map(|i| i.holders.len()).unwrap_or(0)
290    }
291
292    /// Whether a key is fully replicated.
293    pub fn is_fully_replicated(&self, key: &[u8; 32]) -> bool {
294        self.replica_count(key) >= self.config.factor
295    }
296
297    /// Get all keys that are under-replicated.
298    pub fn under_replicated_keys(&self) -> Vec<[u8; 32]> {
299        self.replicas
300            .iter()
301            .filter(|(_, info)| info.holders.len() < self.config.factor)
302            .map(|(k, _)| *k)
303            .collect()
304    }
305
306    /// Get the set of peers holding replicas of a key.
307    pub fn holders(&self, key: &[u8; 32]) -> HashSet<PeerId> {
308        self.replicas
309            .get(key)
310            .map(|i| i.holders.clone())
311            .unwrap_or_default()
312    }
313
314    /// Aggregate statistics.
315    pub fn stats(&self) -> ReplicationStats {
316        let total_keys = self.replicas.len();
317        let fully_replicated = self
318            .replicas
319            .values()
320            .filter(|i| i.holders.len() >= self.config.factor)
321            .count();
322        let total_replicas: usize = self.replicas.values().map(|i| i.holders.len()).sum();
323
324        ReplicationStats {
325            total_keys,
326            fully_replicated,
327            under_replicated: total_keys - fully_replicated,
328            pending_tasks: self.pending.len(),
329            total_replicas,
330        }
331    }
332
333    /// Get the replication configuration
334    pub fn config(&self) -> &ReplicationConfig {
335        &self.config
336    }
337}
338
339// ── Tests ───────────────────────────────────────────────────────────────────
340
341#[cfg(test)]
342mod tests {
343    use super::*;
344    use crate::NodeConfig;
345
346    fn make_peer(id_byte: u8) -> PeerInfo {
347        PeerInfo::new(
348            PeerId::from_bytes([id_byte; 32]),
349            vec![format!("/ip4/127.0.0.1/tcp/{}", 9000 + id_byte as u16)],
350            [id_byte; 32],
351            [id_byte; 32],
352        )
353    }
354
355    #[test]
356    fn test_schedule_replication() {
357        let mut mgr = ReplicationManager::with_factor(3);
358        let key = [1u8; 32];
359        let value = Bytes::from_static(b"hello world");
360        let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
361
362        let tasks = mgr.schedule_replication(key, value, &peers);
363        assert_eq!(tasks.len(), 3);
364        assert!(tasks.iter().all(|t| t.priority == ReplicationPriority::High));
365    }
366
367    #[test]
368    fn test_confirm_replication() {
369        let mut mgr = ReplicationManager::with_factor(3);
370        let key = [2u8; 32];
371        let value = Bytes::from_static(b"data");
372        let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
373
374        let tasks = mgr.schedule_replication(key, value, &peers);
375        assert_eq!(mgr.replica_count(&key), 0);
376
377        for task in &tasks {
378            mgr.confirm_replication(key, task.target_peer);
379        }
380        assert_eq!(mgr.replica_count(&key), 3);
381        assert!(mgr.is_fully_replicated(&key));
382    }
383
384    #[test]
385    fn test_handle_peer_lost() {
386        let mut mgr = ReplicationManager::with_factor(3);
387        let local_id = PeerId::random();
388        let dht = DHTNode::new(local_id, NodeConfig::default());
389        let key = [3u8; 32];
390        let value = Bytes::from_static(b"important data");
391        let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
392
393        // Add peers to DHT routing table
394        for p in &peers {
395            let _ = dht.add_peer(p.clone());
396        }
397
398        // Store in DHT
399        dht.store_value(key, value.clone(), [0u8; 32]).unwrap();
400
401        // Schedule & confirm 3 replicas
402        let tasks = mgr.schedule_replication(key, value, &peers);
403        for t in &tasks {
404            mgr.confirm_replication(key, t.target_peer);
405        }
406        assert_eq!(mgr.replica_count(&key), 3);
407
408        // Lose one peer → should produce 1 re-replication task
409        let lost = tasks[0].target_peer;
410        let re_tasks = mgr.handle_peer_lost(&lost, &dht);
411        // After losing one peer, we need 1 more replica
412        assert!(!re_tasks.is_empty());
413        assert_eq!(mgr.replica_count(&key), 2); // lost peer removed
414    }
415
416    #[test]
417    fn test_under_replicated_keys() {
418        let mut mgr = ReplicationManager::with_factor(3);
419        let key = [4u8; 32];
420        let value = Bytes::from_static(b"partial");
421        let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
422
423        mgr.schedule_replication(key, value, &peers);
424        // Nothing confirmed yet → under-replicated
425        let under = mgr.under_replicated_keys();
426        assert!(under.contains(&key));
427    }
428
429    #[test]
430    fn test_stats() {
431        let mut mgr = ReplicationManager::with_factor(2);
432        let peers: Vec<PeerInfo> = (1..=5).map(make_peer).collect();
433
434        // Key A: fully replicated
435        let key_a = [10u8; 32];
436        let tasks_a = mgr.schedule_replication(key_a, Bytes::from_static(b"aaa"), &peers);
437        for t in &tasks_a {
438            mgr.confirm_replication(key_a, t.target_peer);
439        }
440
441        // Key B: partially replicated
442        let key_b = [11u8; 32];
443        let tasks_b = mgr.schedule_replication(key_b, Bytes::from_static(b"bbb"), &peers);
444        mgr.confirm_replication(key_b, tasks_b[0].target_peer);
445
446        let s = mgr.stats();
447        assert_eq!(s.total_keys, 2);
448        assert_eq!(s.fully_replicated, 1);
449        assert_eq!(s.under_replicated, 1);
450        assert_eq!(s.total_replicas, 3); // 2 for A + 1 for B
451    }
452
453    #[test]
454    fn test_take_pending_tasks() {
455        let mut mgr = ReplicationManager::with_factor(2);
456        let peers: Vec<PeerInfo> = (1..=3).map(make_peer).collect();
457
458        mgr.schedule_replication([20u8; 32], Bytes::from_static(b"x"), &peers);
459        assert!(mgr.pending_count() > 0);
460
461        let taken = mgr.take_pending_tasks();
462        assert!(!taken.is_empty());
463        assert_eq!(mgr.pending_count(), 0);
464    }
465}