1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
pub mod dht_protocol;
pub mod dht_trait;
pub mod mirror_dht;
pub mod rrdht;

#[cfg(test)]
pub mod tests {
    use crate::{
        dht::{dht_protocol::*, dht_trait::Dht, mirror_dht::MirrorDht, rrdht::RrDht},
        tests::enable_logging_for_test,
    };
    use lib3h_protocol::{
        data_types::{EntryAspectData, EntryData},
        Address,
    };
    use url::Url;

    lazy_static! {
        /// CONSTS
        /// Entries
        pub static ref ENTRY_ADDRESS_1: Address = "entry_addr_1".into();
        pub static ref ENTRY_ADDRESS_2: Address = "entry_addr_2".into();
        pub static ref ENTRY_ADDRESS_3: Address = "entry_addr_3".into();
        /// Aspects
        pub static ref ASPECT_CONTENT_1: Vec<u8> = "hello-1".as_bytes().to_vec();
        pub static ref ASPECT_CONTENT_2: Vec<u8> = "l-2".as_bytes().to_vec();
        pub static ref ASPECT_CONTENT_3: Vec<u8> = "ChainHeader-3".as_bytes().to_vec();
        pub static ref ASPECT_ADDRESS_1: Address = "aspect_addr_1".into();
        pub static ref ASPECT_ADDRESS_2: Address = "aspect_addr_2".into();
        pub static ref ASPECT_ADDRESS_3: Address = "aspect_addr_3".into();
    }

    const PEER_A: &str = "alex";
    const PEER_B: &str = "billy";
    const PEER_C: &str = "camille";

    // Request counters
    #[allow(dead_code)]
    static mut FETCH_COUNT: u32 = 0;

    fn create_test_transport(peer_address: &str) -> Url {
        Url::parse(format!("test://{}", peer_address).as_str()).unwrap()
    }

    #[allow(non_snake_case)]
    fn create_PeerData(peer_address: &str) -> PeerData {
        PeerData {
            peer_address: peer_address.to_owned(),
            peer_uri: create_test_transport(peer_address),
            timestamp: 421,
        }
    }

    #[allow(non_snake_case)]
    fn create_EntryData(
        entry_address: &Address,
        aspect_address: &Address,
        aspect_content: &[u8],
    ) -> EntryData {
        let aspect = EntryAspectData {
            aspect_address: aspect_address.to_owned(),
            type_hint: "dht_test".to_string(),
            aspect: aspect_content.to_owned(),
            publish_ts: 123,
        };
        EntryData {
            entry_address: entry_address.to_owned(),
            aspect_list: vec![aspect],
        }
    }

    #[allow(non_snake_case)]
    #[allow(dead_code)]
    fn create_FetchEntry(entry_address: &Address) -> FetchDhtEntryData {
        unsafe {
            FETCH_COUNT += 1;
            FetchDhtEntryData {
                msg_id: format!("fetch_{}", FETCH_COUNT),
                entry_address: entry_address.to_owned(),
            }
        }
    }

    fn new_dht(is_mirror: bool, peer_address: &str) -> Box<dyn Dht> {
        if is_mirror {
            return Box::new(MirrorDht::new(
                peer_address,
                &create_test_transport(peer_address),
            ));
        }
        Box::new(RrDht::new())
    }

    #[test]
    fn test_this_peer() {
        enable_logging_for_test(true);
        let dht = new_dht(true, PEER_A);
        let this = dht.this_peer();
        assert_eq!(this.peer_address, PEER_A);
    }

    #[test]
    fn test_own_peer_list() {
        enable_logging_for_test(true);
        let mut dht = new_dht(true, PEER_A);
        // Should be empty
        let this = dht.get_peer(PEER_A);
        assert!(this.is_none());
        let peer_list = dht.get_peer_list();
        assert_eq!(peer_list.len(), 0);
        // Add a peer
        dht.post(DhtCommand::HoldPeer(create_PeerData(PEER_B)))
            .unwrap();
        let (did_work, _) = dht.process().unwrap();
        assert!(did_work);
        // Should have it
        let peer = dht.get_peer(PEER_B).unwrap();
        assert_eq!(peer.peer_address, PEER_B);
        let peer_list = dht.get_peer_list();
        assert_eq!(peer_list.len(), 1);
        assert_eq!(peer_list[0].peer_address, PEER_B);
        // Add a peer again
        dht.post(DhtCommand::HoldPeer(create_PeerData(PEER_C)))
            .unwrap();
        let (did_work, _) = dht.process().unwrap();
        assert!(did_work);
        // Should have it
        let peer = dht.get_peer(PEER_B).unwrap();
        assert_eq!(peer.peer_address, PEER_B);
        let peer_list = dht.get_peer_list();
        assert_eq!(peer_list.len(), 2);
    }

    #[test]
    fn test_get_own_entry() {
        enable_logging_for_test(true);
        let mut dht = new_dht(true, PEER_A);
        // Should be empty
        let entry_address_list = dht.get_entry_address_list();
        assert_eq!(entry_address_list.len(), 0);
        // Add a data item
        let entry = create_EntryData(&ENTRY_ADDRESS_1, &ASPECT_ADDRESS_1, &ASPECT_CONTENT_1);
        dht.post(DhtCommand::HoldEntryAspectAddress(entry.clone()))
            .unwrap();
        let (did_work, _) = dht.process().unwrap();
        assert!(did_work);
        // Should have it
        let entry_address_list = dht.get_entry_address_list();
        assert_eq!(entry_address_list.len(), 1);
        let maybe_aspects = dht.get_aspects_of(&ENTRY_ADDRESS_1);
        assert!(maybe_aspects.is_some());
        assert_eq!(maybe_aspects.unwrap().len(), 1);
        // Fetch it
        let fetch_entry = FetchDhtEntryData {
            msg_id: "fetch_1".to_owned(),
            entry_address: ENTRY_ADDRESS_1.clone(),
        };
        dht.post(DhtCommand::FetchEntry(fetch_entry)).unwrap();
        let (_did_work, event_list) = dht.process().unwrap();
        assert_eq!(event_list.len(), 1);
        let provide_entry = unwrap_to!(event_list[0] => DhtEvent::EntryDataRequested);
        // Make something up
        let response = FetchDhtEntryResponseData {
            msg_id: provide_entry.msg_id.clone(),
            entry: entry.clone(),
        };
        dht.post(DhtCommand::EntryDataResponse(response)).unwrap();
        let (_did_work, event_list) = dht.process().unwrap();
        // Should have it
        assert_eq!(event_list.len(), 1);
        let entry_response = unwrap_to!(event_list[0] => DhtEvent::FetchEntryResponse);
        assert_eq!(entry_response.entry, entry);
    }

    #[test]
    fn test_update_peer() {
        enable_logging_for_test(true);
        let mut dht = new_dht(true, PEER_A);
        // Should be empty
        let this = dht.get_peer(PEER_A);
        assert!(this.is_none());
        let peer_list = dht.get_peer_list();
        assert_eq!(peer_list.len(), 0);
        // Add a peer
        let mut peer_b_data = create_PeerData(PEER_B);
        dht.post(DhtCommand::HoldPeer(peer_b_data.clone())).unwrap();
        let (did_work, _) = dht.process().unwrap();
        assert!(did_work);
        // Should have it
        let peer = dht.get_peer(PEER_B).unwrap();
        assert_eq!(peer.peer_address, PEER_B);
        // Add older peer info
        let ref_time = peer_b_data.timestamp;
        peer_b_data.timestamp -= 1;
        dht.post(DhtCommand::HoldPeer(peer_b_data.clone())).unwrap();
        let (did_work, _) = dht.process().unwrap();
        assert!(did_work);
        // Should have unchanged timestamp
        let peer = dht.get_peer(PEER_B).unwrap();
        assert_eq!(peer.timestamp, ref_time);
        // Add newer peer info
        peer_b_data.timestamp = ref_time + 1;
        dht.post(DhtCommand::HoldPeer(peer_b_data)).unwrap();
        let (did_work, _) = dht.process().unwrap();
        assert!(did_work);
        // Should have unchanged timestamp
        let peer = dht.get_peer(PEER_B).unwrap();
        assert!(peer.timestamp > ref_time);
    }

    #[test]
    fn test_mirror_broadcast_entry() {
        enable_logging_for_test(true);
        let mut dht_a = new_dht(true, PEER_A);
        let mut dht_b = new_dht(true, PEER_B);
        // Add a peer
        dht_a
            .post(DhtCommand::HoldPeer(create_PeerData(PEER_B)))
            .unwrap();
        let (did_work, _) = dht_a.process().unwrap();
        assert!(did_work);
        // Add a data item in DHT A
        let entry_data = create_EntryData(&ENTRY_ADDRESS_1, &ASPECT_ADDRESS_1, &ASPECT_CONTENT_1);
        dht_a
            .post(DhtCommand::BroadcastEntry(entry_data.clone()))
            .unwrap();
        let (did_work, gossip_list) = dht_a.process().unwrap();
        assert!(did_work);
        // Should return a gossipTo
        assert_eq!(gossip_list.len(), 1);
        let gossip_to = unwrap_to!(gossip_list[0] => DhtEvent::GossipTo);
        assert_eq!(gossip_to.peer_address_list.len(), 1);
        assert_eq!(gossip_to.peer_address_list[0], PEER_B);
        // Post it as a remoteGossipTo
        let remote_gossip = RemoteGossipBundleData {
            from_peer_address: PEER_A.to_string(),
            bundle: gossip_to.bundle.clone(),
        };
        dht_b.post(DhtCommand::HandleGossip(remote_gossip)).unwrap();
        let (did_work, event_list) = dht_b.process().unwrap();
        assert!(did_work);
        // Should receive a HoldRequested
        assert_eq!(event_list.len(), 1);
        if let DhtEvent::HoldEntryRequested(from, hold_entry) = event_list[0].clone() {
            assert_eq!(from, PEER_B.clone());
            assert_eq!(hold_entry, entry_data.clone());
        } else {
            panic!("Should be of variant type HoldEntryRequested");
        }
        // Tell DHT B to hold it
        dht_b
            .post(DhtCommand::HoldEntryAspectAddress(entry_data))
            .unwrap();
        let (did_work, _) = dht_b.process().unwrap();
        assert!(did_work);
        // DHT B should have the entry
        let entry_list = dht_b.get_entry_address_list();
        assert_eq!(entry_list.len(), 1);
    }

    #[test]
    fn test_mirror_gossip_peer() {
        enable_logging_for_test(true);
        let mut dht_a = new_dht(true, PEER_A);
        let mut dht_b = new_dht(true, PEER_B);
        // Add a peer
        let peer_b_data = dht_b.this_peer();
        dht_a
            .post(DhtCommand::HoldPeer(peer_b_data.clone()))
            .unwrap();
        let (did_work, _) = dht_a.process().unwrap();
        assert!(did_work);
        // Add a second peer
        let peer_c_data = create_PeerData(PEER_C);
        dht_a
            .post(DhtCommand::HoldPeer(peer_c_data.clone()))
            .unwrap();
        let (did_work, gossip_list) = dht_a.process().unwrap();
        assert!(did_work);
        // Should return gossipTos
        println!("gossip_list: {:?}", gossip_list);
        assert_eq!(gossip_list.len(), 2);
        // 2nd gossip should be a response
        let gossip_to = unwrap_to!(gossip_list[1] => DhtEvent::GossipTo);
        assert_eq!(gossip_to.peer_address_list.len(), 1);
        assert_eq!(gossip_to.peer_address_list[0], PEER_C);
        // 1st gossip should be propagation
        let gossip_to = unwrap_to!(gossip_list[0] => DhtEvent::GossipTo);
        assert_eq!(gossip_to.peer_address_list.len(), 1);
        assert_eq!(gossip_to.peer_address_list[0], PEER_B);
        // Post it as a remoteGossipTo
        let remote_gossip = RemoteGossipBundleData {
            from_peer_address: PEER_A.to_string(),
            bundle: gossip_to.bundle.clone(),
        };
        dht_b.post(DhtCommand::HandleGossip(remote_gossip)).unwrap();
        let (did_work, event_list) = dht_b.process().unwrap();
        assert!(did_work);
        println!("event_list: {:?}", event_list);
        assert_eq!(event_list.len(), 1);
        let peer_to_hold = unwrap_to!(event_list[0] => DhtEvent::HoldPeerRequested);
        // Hold requested peer
        dht_b
            .post(DhtCommand::HoldPeer(peer_to_hold.clone()))
            .unwrap();
        let (did_work, _) = dht_b.process().unwrap();
        assert!(did_work);
        // DHT B should have the data
        let peer_info = dht_b.get_peer(PEER_C).unwrap();
        assert_eq!(peer_info, peer_c_data);
    }
}