kad/
lib.rs

1//! rust-kad
2//! A Kademlia DHT implementation in Rust
3//!
4//! https://github.com/ryankurte/rust-kad
5//! Copyright 2018 Ryan Kurte
6
7use std::fmt::{Debug, Display};
8use std::time::Duration;
9
10use futures::channel::mpsc::Sender;
11
12use structopt::StructOpt;
13
14pub mod common;
15use crate::common::*;
16
17pub mod table;
18use table::KNodeTable;
19
20pub mod store;
21use store::HashMapStore;
22
23pub mod dht;
24use dht::Dht;
25
26pub mod prelude;
27
28pub mod mock;
29
30/// DHT Configuration object
31#[derive(PartialEq, Clone, Debug, StructOpt)]
32pub struct Config {
33    #[structopt(long = "dht-bucket-size", default_value = "20")]
34    /// Size of buckets and number of nearby nodes to consider when searching
35    pub k: usize,
36
37    #[structopt(long = "dht-concurrency", default_value = "4")]
38    /// Number of concurrent operations to be performed at once (also known as α or alpha)
39    pub concurrency: usize,
40
41    #[structopt(long = "dht-recursion-limit", default_value = "10")]
42    /// Maximum recursion depth for searches
43    pub max_recursion: usize,
44
45    #[structopt(long = "dht-search-timeout", parse(try_from_str = parse_duration), default_value = "2s")]
46    /// Timeout for search iterations with missing responses
47    pub search_timeout: Duration,
48
49    #[structopt(long = "dht-node-timeout", parse(try_from_str = parse_duration), default_value = "15m")]
50    /// Timeout for no-contact from oldest node (before ping and expiry occurs)
51    pub node_timeout: Duration,
52}
53
54fn parse_duration(s: &str) -> Result<Duration, humantime::DurationError> {
55    use std::str::FromStr;
56    let d = humantime::Duration::from_str(s)?;
57    Ok(d.into())
58}
59
60impl Default for Config {
61    fn default() -> Config {
62        Config {
63            k: 20,
64            concurrency: 4,
65            max_recursion: 10,
66            search_timeout: Duration::from_secs(2),
67            node_timeout: Duration::from_secs(15 * 60 * 60),
68        }
69    }
70}
71
72/// Standard DHT implementation using included KNodeTable and HashMapStore implementations
73pub type StandardDht<Id, Info, Data, ReqId> =
74    Dht<Id, Info, Data, ReqId, KNodeTable<Id, Info>, HashMapStore<Id, Data>>;
75
76impl<Id, Info, Data, ReqId> Dht<Id, Info, Data, ReqId>
77where
78    Id: DatabaseId + Clone + Send + 'static,
79    Info: PartialEq + Clone + Debug + Send + 'static,
80    Data: PartialEq + Clone + Debug + Send + 'static,
81    ReqId: RequestId + Clone + Display + Send + 'static,
82{
83    /// Create a new DHT using the standard node table and data store implementations
84    pub fn standard(
85        id: Id,
86        config: Config,
87        req_sink: Sender<(ReqId, Entry<Id, Info>, Request<Id, Data>)>,
88    ) -> StandardDht<Id, Info, Data, ReqId> {
89        let table = KNodeTable::new(id.clone(), config.k, id.max_bits());
90        let store = HashMapStore::new();
91        Dht::custom(id, config, req_sink, table, store)
92    }
93}
94
95#[cfg(nope)]
96#[cfg(test)]
97mod tests {
98    use futures::executor::block_on;
99    use std::clone::Clone;
100
101    use super::*;
102
103    use crate::mock::MockSync;
104    use crate::store::HashMapStore;
105    use crate::table::{KNodeTable, NodeTable};
106
107    use rr_mux::mock::{MockConnector, MockTransaction};
108    use rr_mux::Mux;
109
110    type RequestId = u64;
111    type NodeId = [u8; 1];
112    type Info = MockSync;
113    type Data = MockSync;
114
115    #[test]
116    fn test_mux() {
117        // Create a generic mux
118        let dht_mux = Mux::<
119            RequestId,
120            Entry<NodeId, Info>,
121            Request<NodeId, Data>,
122            Response<NodeId, Info, Data>,
123            Error,
124            (),
125        >::new();
126
127        // Bind it to the DHT instance
128        let n1 = Entry::new([0b0001], MockSync::new(100));
129        let _dht = Dht::<NodeId, Info, Data, RequestId, _, _, _, _>::standard(
130            n1.id().clone(),
131            Config::default(),
132            dht_mux,
133        );
134    }
135
136    #[test]
137    fn test_connect() {
138        let n1 = Entry::new([0b0001], MockSync::new(100));
139        let n2 = Entry::new([0b0010], MockSync::new(200));
140        let n3 = Entry::new([0b0011], MockSync::new(300));
141        let n4 = Entry::new([0b1000], MockSync::new(400));
142
143        // Build expectations
144        let mut connector = MockConnector::new().expect(vec![
145            // First transaction to bootstrap onto the network
146            MockTransaction::request(
147                n2.clone(),
148                Request::FindNode(n1.id().clone()),
149                Ok((
150                    Response::NodesFound(n1.id().clone(), vec![n3.clone(), n4.clone()]),
151                    (),
152                )),
153            ),
154            //bootsrap to found nodes
155            MockTransaction::request(
156                n3.clone(),
157                Request::FindNode(n1.id().clone()),
158                Ok((Response::NodesFound(n1.id().clone(), vec![]), ())),
159            ),
160            MockTransaction::request(
161                n4.clone(),
162                Request::FindNode(n1.id().clone()),
163                Ok((Response::NodesFound(n1.id().clone(), vec![]), ())),
164            ),
165        ]);
166
167        // Create configuration
168        let mut config = Config::default();
169        config.concurrency = 2;
170
171        let knodetable = KNodeTable::new(n1.id().clone(), 2, 4);
172
173        // Instantiated DHT
174        let store: HashMapStore<NodeId, MockSync> = HashMapStore::new();
175        let mut dht = Dht::<NodeId, MockSync, _, u64, _, _, _, _>::new(
176            n1.id().clone(),
177            config,
178            knodetable,
179            connector.clone(),
180            store,
181        );
182
183        // Attempt initial bootstrapping
184        block_on(dht.connect(n2.clone(), ())).unwrap();
185
186        // Check bootstrapped node is added to db
187        assert_eq!(Some(n2.clone()), dht.contains(n2.id()));
188
189        // Check Reported nodes are added
190        assert_eq!(Some(n3.clone()), dht.contains(n3.id()));
191        assert_eq!(Some(n4.clone()), dht.contains(n4.id()));
192
193        // Check expectations are done
194        connector.finalise();
195    }
196
197    #[test]
198    fn test_lookup() {
199        let n1 = Entry::new([0b1000], 100);
200        let n2 = Entry::new([0b0011], 200);
201        let n3 = Entry::new([0b0010], 300);
202        let n4 = Entry::new([0b1001], 400);
203        let n5 = Entry::new([0b1010], 400);
204
205        // Build expectations
206        let mut connector = MockConnector::new().expect(vec![
207            // First transaction to bootstrap onto the network
208            MockTransaction::request(
209                n2.clone(),
210                Request::FindNode(n4.id().clone()),
211                Ok((Response::NodesFound(n4.id().clone(), vec![n4.clone()]), ())),
212            ),
213            MockTransaction::request(
214                n3.clone(),
215                Request::FindNode(n4.id().clone()),
216                Ok((Response::NodesFound(n4.id().clone(), vec![n5.clone()]), ())),
217            ),
218            // Second iteration
219            MockTransaction::request(
220                n4.clone(),
221                Request::FindNode(n4.id().clone()),
222                Ok((Response::NodesFound(n4.id().clone(), vec![]), ())),
223            ),
224            MockTransaction::request(
225                n5.clone(),
226                Request::FindNode(n4.id().clone()),
227                Ok((Response::NodesFound(n4.id().clone(), vec![]), ())),
228            ),
229        ]);
230
231        // Create configuration
232        let mut config = Config::default();
233        config.concurrency = 2;
234        config.k = 2;
235
236        let mut knodetable = KNodeTable::new(n1.id().clone(), 2, 4);
237
238        // Inject initial nodes into the table
239        knodetable.create_or_update(&n2);
240        knodetable.create_or_update(&n3);
241
242        // Instantiated DHT
243        let store: HashMapStore<NodeId, u64> = HashMapStore::new();
244        let mut dht = Dht::<NodeId, u64, _, u64, _, _, _, _>::new(
245            n1.id().clone(),
246            config,
247            knodetable,
248            connector.clone(),
249            store,
250        );
251
252        // Perform search
253        block_on(dht.lookup(n4.id().clone(), ())).expect("lookup failed");
254
255        connector.finalise();
256    }
257
258    #[test]
259    fn test_store() {
260        let n1 = Entry::new([0b1000], 100);
261        let n2 = Entry::new([0b0011], 200);
262        let n3 = Entry::new([0b0010], 300);
263        let n4 = Entry::new([0b1001], 400);
264        let n5 = Entry::new([0b1010], 500);
265
266        let id: [u8; 1] = [0b1011];
267        let val = vec![1234];
268
269        // Build expectations
270        let mut connector = MockConnector::new().expect(vec![
271            // First transaction to bootstrap onto the network
272            MockTransaction::request(
273                n2.clone(),
274                Request::FindNode(id),
275                Ok((Response::NodesFound(id, vec![n4.clone()]), ())),
276            ),
277            MockTransaction::request(
278                n3.clone(),
279                Request::FindNode(id),
280                Ok((Response::NodesFound(id, vec![n5.clone()]), ())),
281            ),
282            // Second iteration to find k nodes closest to v
283            MockTransaction::request(
284                n5.clone(),
285                Request::FindNode(id),
286                Ok((Response::NodesFound(id, vec![]), ())),
287            ),
288            MockTransaction::request(
289                n4.clone(),
290                Request::FindNode(id),
291                Ok((Response::NodesFound(id, vec![]), ())),
292            ),
293            // Final iteration pushes data to k nodes
294            MockTransaction::request(
295                n5.clone(),
296                Request::Store(id, val.clone()),
297                Ok((Response::NoResult, ())),
298            ),
299            MockTransaction::request(
300                n4.clone(),
301                Request::Store(id, val.clone()),
302                Ok((Response::NoResult, ())),
303            ),
304        ]);
305
306        // Create configuration
307        let mut config = Config::default();
308        config.concurrency = 2;
309        config.k = 2;
310
311        let mut knodetable = KNodeTable::new(n1.id().clone(), 2, 4);
312
313        // Inject initial nodes into the table
314        knodetable.create_or_update(&n2);
315        knodetable.create_or_update(&n3);
316
317        // Instantiated DHT
318        let store: HashMapStore<NodeId, u64> = HashMapStore::new();
319        let mut dht = Dht::<NodeId, u64, _, u64, _, _, _, _>::new(
320            n1.id().clone(),
321            config,
322            knodetable,
323            connector.clone(),
324            store,
325        );
326
327        // Perform store
328        block_on(dht.store(id, val, ())).expect("store failed");
329
330        connector.finalise();
331    }
332
333    #[test]
334    fn test_find() {
335        let n1 = Entry::new([0b1000], 100);
336        let n2 = Entry::new([0b0011], 200);
337        let n3 = Entry::new([0b0010], 300);
338        let n4 = Entry::new([0b1001], 400);
339        let n5 = Entry::new([0b1010], 500);
340
341        let id: [u8; 1] = [0b1011];
342        let val = vec![1234];
343
344        // Build expectations
345        let mut connector = MockConnector::new().expect(vec![
346            // First transaction to bootstrap onto the network
347            MockTransaction::request(
348                n2.clone(),
349                Request::FindValue(id),
350                Ok((Response::NodesFound(id, vec![n4.clone()]), ())),
351            ),
352            MockTransaction::request(
353                n3.clone(),
354                Request::FindValue(id),
355                Ok((Response::NodesFound(id, vec![n5.clone()]), ())),
356            ),
357            // Next iteration gets node data
358            MockTransaction::request(
359                n5.clone(),
360                Request::FindValue(id),
361                Ok((Response::ValuesFound(id, val.clone()), ())),
362            ),
363            MockTransaction::request(
364                n4.clone(),
365                Request::FindValue(id),
366                Ok((Response::ValuesFound(id, val.clone()), ())),
367            ),
368        ]);
369
370        // Create configuration
371        let mut config = Config::default();
372        config.concurrency = 2;
373        config.k = 2;
374
375        let mut knodetable = KNodeTable::new(n1.id().clone(), 2, 4);
376
377        // Inject initial nodes into the table
378        knodetable.create_or_update(&n2);
379        knodetable.create_or_update(&n3);
380
381        // Instantiated DHT
382        let store: HashMapStore<NodeId, u64> = HashMapStore::new();
383        let mut dht = Dht::<NodeId, u64, _, u64, _, _, _, _>::new(
384            n1.id().clone(),
385            config,
386            knodetable,
387            connector.clone(),
388            store,
389        );
390
391        // Perform store
392        block_on(dht.find(id, ())).expect("find failed");
393
394        connector.finalise();
395    }
396}