1use std::fmt::{Debug, Display};
8use std::time::Duration;
9
10use futures::channel::mpsc::Sender;
11
12use structopt::StructOpt;
13
14pub mod common;
15use crate::common::*;
16
17pub mod table;
18use table::KNodeTable;
19
20pub mod store;
21use store::HashMapStore;
22
23pub mod dht;
24use dht::Dht;
25
26pub mod prelude;
27
28pub mod mock;
29
30#[derive(PartialEq, Clone, Debug, StructOpt)]
32pub struct Config {
33 #[structopt(long = "dht-bucket-size", default_value = "20")]
34 pub k: usize,
36
37 #[structopt(long = "dht-concurrency", default_value = "4")]
38 pub concurrency: usize,
40
41 #[structopt(long = "dht-recursion-limit", default_value = "10")]
42 pub max_recursion: usize,
44
45 #[structopt(long = "dht-search-timeout", parse(try_from_str = parse_duration), default_value = "2s")]
46 pub search_timeout: Duration,
48
49 #[structopt(long = "dht-node-timeout", parse(try_from_str = parse_duration), default_value = "15m")]
50 pub node_timeout: Duration,
52}
53
54fn parse_duration(s: &str) -> Result<Duration, humantime::DurationError> {
55 use std::str::FromStr;
56 let d = humantime::Duration::from_str(s)?;
57 Ok(d.into())
58}
59
60impl Default for Config {
61 fn default() -> Config {
62 Config {
63 k: 20,
64 concurrency: 4,
65 max_recursion: 10,
66 search_timeout: Duration::from_secs(2),
67 node_timeout: Duration::from_secs(15 * 60 * 60),
68 }
69 }
70}
71
72pub type StandardDht<Id, Info, Data, ReqId> =
74 Dht<Id, Info, Data, ReqId, KNodeTable<Id, Info>, HashMapStore<Id, Data>>;
75
76impl<Id, Info, Data, ReqId> Dht<Id, Info, Data, ReqId>
77where
78 Id: DatabaseId + Clone + Send + 'static,
79 Info: PartialEq + Clone + Debug + Send + 'static,
80 Data: PartialEq + Clone + Debug + Send + 'static,
81 ReqId: RequestId + Clone + Display + Send + 'static,
82{
83 pub fn standard(
85 id: Id,
86 config: Config,
87 req_sink: Sender<(ReqId, Entry<Id, Info>, Request<Id, Data>)>,
88 ) -> StandardDht<Id, Info, Data, ReqId> {
89 let table = KNodeTable::new(id.clone(), config.k, id.max_bits());
90 let store = HashMapStore::new();
91 Dht::custom(id, config, req_sink, table, store)
92 }
93}
94
95#[cfg(nope)]
96#[cfg(test)]
97mod tests {
98 use futures::executor::block_on;
99 use std::clone::Clone;
100
101 use super::*;
102
103 use crate::mock::MockSync;
104 use crate::store::HashMapStore;
105 use crate::table::{KNodeTable, NodeTable};
106
107 use rr_mux::mock::{MockConnector, MockTransaction};
108 use rr_mux::Mux;
109
110 type RequestId = u64;
111 type NodeId = [u8; 1];
112 type Info = MockSync;
113 type Data = MockSync;
114
115 #[test]
116 fn test_mux() {
117 let dht_mux = Mux::<
119 RequestId,
120 Entry<NodeId, Info>,
121 Request<NodeId, Data>,
122 Response<NodeId, Info, Data>,
123 Error,
124 (),
125 >::new();
126
127 let n1 = Entry::new([0b0001], MockSync::new(100));
129 let _dht = Dht::<NodeId, Info, Data, RequestId, _, _, _, _>::standard(
130 n1.id().clone(),
131 Config::default(),
132 dht_mux,
133 );
134 }
135
136 #[test]
137 fn test_connect() {
138 let n1 = Entry::new([0b0001], MockSync::new(100));
139 let n2 = Entry::new([0b0010], MockSync::new(200));
140 let n3 = Entry::new([0b0011], MockSync::new(300));
141 let n4 = Entry::new([0b1000], MockSync::new(400));
142
143 let mut connector = MockConnector::new().expect(vec![
145 MockTransaction::request(
147 n2.clone(),
148 Request::FindNode(n1.id().clone()),
149 Ok((
150 Response::NodesFound(n1.id().clone(), vec![n3.clone(), n4.clone()]),
151 (),
152 )),
153 ),
154 MockTransaction::request(
156 n3.clone(),
157 Request::FindNode(n1.id().clone()),
158 Ok((Response::NodesFound(n1.id().clone(), vec![]), ())),
159 ),
160 MockTransaction::request(
161 n4.clone(),
162 Request::FindNode(n1.id().clone()),
163 Ok((Response::NodesFound(n1.id().clone(), vec![]), ())),
164 ),
165 ]);
166
167 let mut config = Config::default();
169 config.concurrency = 2;
170
171 let knodetable = KNodeTable::new(n1.id().clone(), 2, 4);
172
173 let store: HashMapStore<NodeId, MockSync> = HashMapStore::new();
175 let mut dht = Dht::<NodeId, MockSync, _, u64, _, _, _, _>::new(
176 n1.id().clone(),
177 config,
178 knodetable,
179 connector.clone(),
180 store,
181 );
182
183 block_on(dht.connect(n2.clone(), ())).unwrap();
185
186 assert_eq!(Some(n2.clone()), dht.contains(n2.id()));
188
189 assert_eq!(Some(n3.clone()), dht.contains(n3.id()));
191 assert_eq!(Some(n4.clone()), dht.contains(n4.id()));
192
193 connector.finalise();
195 }
196
197 #[test]
198 fn test_lookup() {
199 let n1 = Entry::new([0b1000], 100);
200 let n2 = Entry::new([0b0011], 200);
201 let n3 = Entry::new([0b0010], 300);
202 let n4 = Entry::new([0b1001], 400);
203 let n5 = Entry::new([0b1010], 400);
204
205 let mut connector = MockConnector::new().expect(vec![
207 MockTransaction::request(
209 n2.clone(),
210 Request::FindNode(n4.id().clone()),
211 Ok((Response::NodesFound(n4.id().clone(), vec![n4.clone()]), ())),
212 ),
213 MockTransaction::request(
214 n3.clone(),
215 Request::FindNode(n4.id().clone()),
216 Ok((Response::NodesFound(n4.id().clone(), vec![n5.clone()]), ())),
217 ),
218 MockTransaction::request(
220 n4.clone(),
221 Request::FindNode(n4.id().clone()),
222 Ok((Response::NodesFound(n4.id().clone(), vec![]), ())),
223 ),
224 MockTransaction::request(
225 n5.clone(),
226 Request::FindNode(n4.id().clone()),
227 Ok((Response::NodesFound(n4.id().clone(), vec![]), ())),
228 ),
229 ]);
230
231 let mut config = Config::default();
233 config.concurrency = 2;
234 config.k = 2;
235
236 let mut knodetable = KNodeTable::new(n1.id().clone(), 2, 4);
237
238 knodetable.create_or_update(&n2);
240 knodetable.create_or_update(&n3);
241
242 let store: HashMapStore<NodeId, u64> = HashMapStore::new();
244 let mut dht = Dht::<NodeId, u64, _, u64, _, _, _, _>::new(
245 n1.id().clone(),
246 config,
247 knodetable,
248 connector.clone(),
249 store,
250 );
251
252 block_on(dht.lookup(n4.id().clone(), ())).expect("lookup failed");
254
255 connector.finalise();
256 }
257
258 #[test]
259 fn test_store() {
260 let n1 = Entry::new([0b1000], 100);
261 let n2 = Entry::new([0b0011], 200);
262 let n3 = Entry::new([0b0010], 300);
263 let n4 = Entry::new([0b1001], 400);
264 let n5 = Entry::new([0b1010], 500);
265
266 let id: [u8; 1] = [0b1011];
267 let val = vec![1234];
268
269 let mut connector = MockConnector::new().expect(vec![
271 MockTransaction::request(
273 n2.clone(),
274 Request::FindNode(id),
275 Ok((Response::NodesFound(id, vec![n4.clone()]), ())),
276 ),
277 MockTransaction::request(
278 n3.clone(),
279 Request::FindNode(id),
280 Ok((Response::NodesFound(id, vec![n5.clone()]), ())),
281 ),
282 MockTransaction::request(
284 n5.clone(),
285 Request::FindNode(id),
286 Ok((Response::NodesFound(id, vec![]), ())),
287 ),
288 MockTransaction::request(
289 n4.clone(),
290 Request::FindNode(id),
291 Ok((Response::NodesFound(id, vec![]), ())),
292 ),
293 MockTransaction::request(
295 n5.clone(),
296 Request::Store(id, val.clone()),
297 Ok((Response::NoResult, ())),
298 ),
299 MockTransaction::request(
300 n4.clone(),
301 Request::Store(id, val.clone()),
302 Ok((Response::NoResult, ())),
303 ),
304 ]);
305
306 let mut config = Config::default();
308 config.concurrency = 2;
309 config.k = 2;
310
311 let mut knodetable = KNodeTable::new(n1.id().clone(), 2, 4);
312
313 knodetable.create_or_update(&n2);
315 knodetable.create_or_update(&n3);
316
317 let store: HashMapStore<NodeId, u64> = HashMapStore::new();
319 let mut dht = Dht::<NodeId, u64, _, u64, _, _, _, _>::new(
320 n1.id().clone(),
321 config,
322 knodetable,
323 connector.clone(),
324 store,
325 );
326
327 block_on(dht.store(id, val, ())).expect("store failed");
329
330 connector.finalise();
331 }
332
333 #[test]
334 fn test_find() {
335 let n1 = Entry::new([0b1000], 100);
336 let n2 = Entry::new([0b0011], 200);
337 let n3 = Entry::new([0b0010], 300);
338 let n4 = Entry::new([0b1001], 400);
339 let n5 = Entry::new([0b1010], 500);
340
341 let id: [u8; 1] = [0b1011];
342 let val = vec![1234];
343
344 let mut connector = MockConnector::new().expect(vec![
346 MockTransaction::request(
348 n2.clone(),
349 Request::FindValue(id),
350 Ok((Response::NodesFound(id, vec![n4.clone()]), ())),
351 ),
352 MockTransaction::request(
353 n3.clone(),
354 Request::FindValue(id),
355 Ok((Response::NodesFound(id, vec![n5.clone()]), ())),
356 ),
357 MockTransaction::request(
359 n5.clone(),
360 Request::FindValue(id),
361 Ok((Response::ValuesFound(id, val.clone()), ())),
362 ),
363 MockTransaction::request(
364 n4.clone(),
365 Request::FindValue(id),
366 Ok((Response::ValuesFound(id, val.clone()), ())),
367 ),
368 ]);
369
370 let mut config = Config::default();
372 config.concurrency = 2;
373 config.k = 2;
374
375 let mut knodetable = KNodeTable::new(n1.id().clone(), 2, 4);
376
377 knodetable.create_or_update(&n2);
379 knodetable.create_or_update(&n3);
380
381 let store: HashMapStore<NodeId, u64> = HashMapStore::new();
383 let mut dht = Dht::<NodeId, u64, _, u64, _, _, _, _>::new(
384 n1.id().clone(),
385 config,
386 knodetable,
387 connector.clone(),
388 store,
389 );
390
391 block_on(dht.find(id, ())).expect("find failed");
393
394 connector.finalise();
395 }
396}