Skip to main content

shadow_integration_tests/
dht_storage.rs

1//! DHT → Storage integration tests
2
3use shadow_core::{PeerId, PeerInfo};
4
5/// Test DHT with content-addressed storage
6pub fn test_dht_with_content_store() -> Result<(), String> {
7    let peer_id = PeerId::random();
8    let config = dht::NodeConfig::default();
9    let node = dht::DHTNode::new(peer_id, config);
10    let store = storage::ContentStore::default();
11
12    // Store data in content store, get hash
13    let data = b"Important distributed data for the network";
14    let content_hash = store
15        .store(data)
16        .map_err(|e| format!("Content store failed: {}", e))?;
17
18    // Store hash → peer mapping in DHT
19    let publisher = *peer_id.as_bytes();
20    node.store_value(content_hash, bytes::Bytes::from(peer_id.as_bytes().to_vec()), publisher)
21        .map_err(|e| format!("DHT store failed: {}", e))?;
22
23    // Retrieve from DHT
24    let dht_value = node
25        .get_value(&content_hash)
26        .ok_or("DHT retrieve returned None")?;
27
28    if dht_value.data.as_ref() != peer_id.as_bytes() {
29        return Err("DHT value mismatch".into());
30    }
31
32    // Retrieve original data from content store
33    let retrieved = store
34        .retrieve(&content_hash)
35        .map_err(|e| format!("Content retrieve failed: {}", e))?;
36
37    if retrieved.as_ref() != data {
38        return Err("Content mismatch".into());
39    }
40
41    Ok(())
42}
43
44/// Test replication with peer loss simulation
45pub fn test_replication_peer_loss() -> Result<(), String> {
46    let config = dht::replication::ReplicationConfig::default();
47    let mut manager = dht::replication::ReplicationManager::new(config);
48
49    let key = [0xAA; 32];
50    let value = bytes::Bytes::from(vec![1, 2, 3, 4, 5]);
51    let peers: Vec<PeerInfo> = (0..5)
52        .map(|i| {
53            PeerInfo::new(
54                PeerId::random(),
55                vec![format!("127.0.0.1:{}", 9000 + i)],
56                [0u8; 32],
57                [0u8; 32],
58            )
59        })
60        .collect();
61
62    // Schedule initial replication
63    let tasks = manager.schedule_replication(key, value, &peers);
64
65    // Confirm replicas
66    for task in &tasks {
67        manager.confirm_replication(key, task.target_peer);
68    }
69
70    if !manager.is_fully_replicated(&key) {
71        return Err("Should be fully replicated".into());
72    }
73
74    Ok(())
75}
76
77#[cfg(test)]
78mod tests {
79    use super::*;
80
81    #[test]
82    fn test_dht_storage_integration() {
83        test_dht_with_content_store().unwrap();
84    }
85
86    #[test]
87    fn test_replication() {
88        test_replication_peer_loss().unwrap();
89    }
90
91    #[test]
92    fn test_chunked_storage_in_dht() {
93        let peer_id = PeerId::random();
94        let node = dht::DHTNode::new(peer_id, dht::NodeConfig::default());
95
96        let chunker = storage::Chunker::new(256);
97        let data = vec![0xABu8; 1024]; // 1KB data
98
99        // Chunk the data
100        let chunks = chunker.chunk(&data).unwrap();
101        assert!(chunks.len() > 1);
102
103        // Store each chunk in DHT
104        let publisher = [0u8; 32];
105        for (info, chunk_data) in &chunks {
106            node.store_value(info.hash, chunk_data.clone(), publisher).unwrap();
107        }
108
109        // Retrieve and reassemble
110        let mut retrieved_chunks = Vec::new();
111        for (info, _) in &chunks {
112            let stored = node.get_value(&info.hash).unwrap();
113            retrieved_chunks.push((info.clone(), stored.data.clone()));
114        }
115
116        // Sort by index for reassembly
117        retrieved_chunks.sort_by_key(|(info, _)| info.index);
118        let reassembled = chunker.reassemble(&retrieved_chunks).unwrap();
119        assert_eq!(reassembled.as_ref(), data.as_slice());
120    }
121
122    #[test]
123    fn test_erasure_coded_dht_storage() {
124        let encoder = storage::ErasureEncoder::new(3, 2).unwrap();
125        let data = vec![0xCDu8; 768]; // 3 * 256
126
127        // Encode with erasure coding
128        let shards = encoder.encode(&data).unwrap();
129        assert_eq!(shards.len(), 5); // 3 data + 2 parity
130
131        // Store each shard in DHT
132        let peer_id = PeerId::random();
133        let node = dht::DHTNode::new(peer_id, dht::NodeConfig::default());
134        let publisher = [0u8; 32];
135
136        let mut shard_hashes = Vec::new();
137        for shard in &shards {
138            let hash = crypto::hash_data(shard);
139            node.store_value(*hash.as_bytes(), shard.clone(), publisher).unwrap();
140            shard_hashes.push(*hash.as_bytes());
141        }
142
143        // Retrieve all shards
144        let retrieved: Vec<Option<bytes::Bytes>> = shard_hashes
145            .iter()
146            .map(|h| node.get_value(h).map(|v| v.data.clone()))
147            .collect();
148
149        // Reconstruct from shards
150        let original_size = data.len();
151        let reconstructed = encoder.decode(&retrieved, original_size).unwrap();
152        assert_eq!(reconstructed.as_ref(), data.as_slice());
153    }
154
155    #[test]
156    fn test_dht_with_routing() {
157        let local_id = PeerId::random();
158        let mut table = dht::RoutingTable::new(local_id, 20);
159
160        // Add peers
161        let peers: Vec<PeerInfo> = (0..50)
162            .map(|i| {
163                PeerInfo::new(
164                    PeerId::random(),
165                    vec![format!("127.0.0.1:{}", 8000 + i)],
166                    [0u8; 32],
167                    [0u8; 32],
168                )
169            })
170            .collect();
171
172        for peer in &peers {
173            let _ = table.add_peer(peer.clone());
174        }
175
176        // Find closest to a random target
177        let target = PeerId::random();
178        let closest = table.find_closest(&target, 10);
179        assert!(!closest.is_empty());
180        assert!(closest.len() <= 10);
181    }
182}